[CRIU] [PATCH 3/3] page-pipe: allow to handle pipes in batch mode

Andrey Vagin avagin at openvz.org
Mon Jan 27 05:38:49 PST 2014


The problem is that vmsplice() to a big pipe fails very often.

The kernel allocates the linear chunk of memory for pipe buffer
descriptos, but big allocation in kernel can fail.

So we need to restrict maximal capacity of pipes. But the number of
pipes is restricted too, so we need to dump memory in a batch mode.

TODO: send all pipes to parasite in one request
Signed-off-by: Andrey Vagin <avagin at openvz.org>
---
 include/page-pipe.h |  14 ++++++++
 page-pipe.c         | 100 +++++++++++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 110 insertions(+), 4 deletions(-)

diff --git a/include/page-pipe.h b/include/page-pipe.h
index 393627b..78c66b2 100644
--- a/include/page-pipe.h
+++ b/include/page-pipe.h
@@ -78,6 +78,9 @@ struct page_pipe_buf {
 	struct list_head l;	/* links into page_pipe->bufs */
 };
 
+struct page_pipe;
+typedef int (page_pipe_batch_cb)(struct page_pipe *pp, void *args);
+
 struct page_pipe {
 	unsigned int nr_pipes;	/* how many page_pipe_bufs in there */
 	struct list_head bufs;	/* list of bufs */
@@ -89,6 +92,12 @@ struct page_pipe {
 	unsigned int nr_holes;	/* number of holes allocated */
 	unsigned int free_hole;	/* number of holes in use */
 	struct iovec *holes;	/* holes */
+
+	unsigned int pipe_max_size; /* max allowable pipe capacity */
+	int pipe_nr;		/* number of avaliable pipes */
+
+	page_pipe_batch_cb *batch_cb; /* callback to handle batch requests */
+	void *batch_args;	/* arguments for the callback */
 };
 
 extern struct page_pipe *create_page_pipe(unsigned int nr, struct iovec *);
@@ -97,5 +106,10 @@ extern int page_pipe_add_page(struct page_pipe *p, unsigned long addr);
 extern int page_pipe_add_hole(struct page_pipe *p, unsigned long addr);
 
 extern void debug_show_page_pipe(struct page_pipe *pp);
+void page_pipe_close_buf(struct page_pipe *pp, struct page_pipe_buf *ppb);
+void page_pipe_cleanup(struct page_pipe *pp);
+void page_pipe_start_batching(struct page_pipe *pp,
+				page_pipe_batch_cb *cb, void *args);
+int page_pipe_complete_batch(struct page_pipe *pp);
 
 #endif /* __CR_PAGE_PIPE_H__ */
diff --git a/page-pipe.c b/page-pipe.c
index bfe1989..97317d0 100644
--- a/page-pipe.c
+++ b/page-pipe.c
@@ -1,5 +1,6 @@
 #include <unistd.h>
 #include <fcntl.h>
+#include <limits.h>
 
 #undef LOG_PREFIX
 #define LOG_PREFIX "page-pipe: "
@@ -7,6 +8,31 @@
 #include "util.h"
 #include "page-pipe.h"
 
+static int open_pipe(struct page_pipe *pp, int *fd)
+{
+	BUG_ON(pp->pipe_nr <= 0);
+
+	if (pipe(fd)) {
+		pr_perror("Can't make pipe for page-pipe");
+		return -1;
+	}
+
+	pp->pipe_nr--;
+
+	return 0;
+}
+
+void page_pipe_close_buf(struct page_pipe *pp, struct page_pipe_buf *ppb)
+{
+	if (ppb->p[0] == -1)
+		return;
+
+	close_safe(&ppb->p[0]);
+	close_safe(&ppb->p[1]);
+
+	pp->pipe_nr++;
+}
+
 static int page_pipe_grow(struct page_pipe *pp)
 {
 	struct page_pipe_buf *ppb;
@@ -17,7 +43,7 @@ static int page_pipe_grow(struct page_pipe *pp)
 	if (!ppb)
 		return -1;
 
-	if (pipe(ppb->p)) {
+	if (open_pipe(pp, ppb->p)) {
 		xfree(ppb);
 		pr_perror("Can't make pipe for page-pipe");
 		return -1;
@@ -42,7 +68,6 @@ struct page_pipe *create_page_pipe(unsigned int nr_segs, struct iovec *iovs)
 
 	pp = xmalloc(sizeof(*pp));
 	if (pp) {
-		pp->nr_pipes = 0;
 		INIT_LIST_HEAD(&pp->bufs);
 		pp->nr_iovs = nr_segs;
 		pp->iovs = iovs;
@@ -52,6 +77,9 @@ struct page_pipe *create_page_pipe(unsigned int nr_segs, struct iovec *iovs)
 		pp->free_hole = 0;
 		pp->holes = NULL;
 
+		pp->pipe_nr = INT_MAX;
+		pp->pipe_max_size = UINT_MAX;
+
 		if (page_pipe_grow(pp))
 			return NULL;
 	}
@@ -59,6 +87,44 @@ struct page_pipe *create_page_pipe(unsigned int nr_segs, struct iovec *iovs)
 	return pp;
 }
 
+/* The number of pipes for one batch request */
+#define NR_PIPES_PER_BATCH 8
+
+#define PAGE_ALLOC_COSTLY_ORDER 3 /* from the kernel source code */
+struct kernel_pipe_buffer {
+        struct page *page;
+        unsigned int offset, len;
+        const struct pipe_buf_operations *ops;
+        unsigned int flags;
+        unsigned long private;
+};
+
+void page_pipe_start_batching(struct page_pipe *pp,
+				page_pipe_batch_cb *cb, void *args)
+{
+	pp->pipe_nr	= NR_PIPES_PER_BATCH;
+	/*
+	 * The kernel allocates the linear chunk of memory for pipe buffers.
+	 * Allocation of chunks with size more than PAGE_ALLOC_COSTLY_ORDER
+	 * fails very often, so we need to restrict the pipe capacity to not
+	 * allocate big chunks.
+	 */
+	pp->pipe_max_size = 8 * PAGE_SIZE / sizeof(struct kernel_pipe_buffer);
+
+	pp->batch_cb	= cb;
+	pp->batch_args	= args;
+}
+
+int page_pipe_complete_batch(struct page_pipe *pp)
+{
+	int ret;
+
+	ret = pp->batch_cb(pp, pp->batch_args);
+	page_pipe_cleanup(pp);
+
+	return ret;
+}
+
 void destroy_page_pipe(struct page_pipe *pp)
 {
 	struct page_pipe_buf *ppb, *n;
@@ -66,14 +132,28 @@ void destroy_page_pipe(struct page_pipe *pp)
 	pr_debug("Killing page pipe\n");
 
 	list_for_each_entry_safe(ppb, n, &pp->bufs, l) {
-		close(ppb->p[0]);
-		close(ppb->p[1]);
+		page_pipe_close_buf(pp, ppb);
 		xfree(ppb);
 	}
 
 	xfree(pp);
 }
 
+void page_pipe_cleanup(struct page_pipe *pp)
+{
+	struct page_pipe_buf *ppb, *n;
+
+	pr_debug("Clean up page pipe\n");
+
+	list_for_each_entry_safe(ppb, n, &pp->bufs, l) {
+		page_pipe_close_buf(pp, ppb);
+		list_del(&ppb->l);
+		xfree(ppb);
+	}
+
+	pp->free_hole = 0;
+}
+
 #define PPB_IOV_BATCH	8
 
 static inline int try_add_page_to(struct page_pipe *pp, struct page_pipe_buf *ppb,
@@ -84,6 +164,9 @@ static inline int try_add_page_to(struct page_pipe *pp, struct page_pipe_buf *pp
 	if (ppb->pages_in == ppb->pipe_size) {
 		int ret;
 
+		if ((ppb->pipe_size << 1) > pp->pipe_max_size)
+			return 1;
+
 		ret = fcntl(ppb->p[0], F_SETPIPE_SZ, (ppb->pipe_size * PAGE_SIZE) << 1);
 		if (ret < 0)
 			return 1; /* need to add another buf */
@@ -134,6 +217,15 @@ int page_pipe_add_page(struct page_pipe *pp, unsigned long addr)
 	if (ret <= 0)
 		return ret;
 
+	if (pp->pipe_nr == 0) {
+		ret = pp->batch_cb(pp, pp->batch_args);
+		page_pipe_cleanup(pp);
+
+		if (ret)
+			return ret;
+
+	}
+
 	ret = page_pipe_grow(pp);
 	if (ret < 0)
 		return ret;
-- 
1.8.3.1



More information about the CRIU mailing list