[CRIU] [PATCH 1/5] Preparing image receive and send for asynchronous sockets.

rodrigo-bruno rbruno at gsd.inesc-id.pt
Mon May 14 03:29:47 MSK 2018


From: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>

---
 criu/img-remote.c         | 93 +++++++++++++++++++++++++++++----------
 criu/include/img-remote.h | 28 ++++++++++--
 2 files changed, 93 insertions(+), 28 deletions(-)

diff --git a/criu/img-remote.c b/criu/img-remote.c
index f812c52d..70db71e2 100644
--- a/criu/img-remote.c
+++ b/criu/img-remote.c
@@ -470,8 +470,6 @@ static struct rimage *new_remote_image(char *path, char *snapshot_id)
 	buf->nbytes = 0;
 	INIT_LIST_HEAD(&(rimg->buf_head));
 	list_add_tail(&(buf->l), &(rimg->buf_head));
-	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
-	rimg->curr_sent_bytes = 0;
 
 	if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
 		pr_err("Remote image in_use mutex init failed\n");
@@ -498,8 +496,6 @@ static struct rimage *clear_remote_image(struct rimage *rimg)
 
 	list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
 	rimg->size = 0;
-	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
-	rimg->curr_sent_bytes = 0;
 
 	pthread_mutex_unlock(&(rimg->in_use));
 
@@ -669,18 +665,43 @@ err:
 	return NULL;
 }
 
+
+int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
+{
+	int ret;
+	struct roperation *op = malloc(sizeof(struct roperation));
+	bzero(op, sizeof(struct roperation));
+	op->fd = fd;
+	op->rimg = rimg;
+	op->size = size;
+	op->flags = flags;
+	op->close_fd = close_fd;
+	op->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	while ((ret = recv_image_async(op)) < 0)
+		if (ret != EAGAIN && ret != EWOULDBLOCK)
+			return -1;
+	return ret;
+}
+
 /* Note: size is a limit on how much we want to read from the socket.  Zero means
  * read until the socket is closed.
  */
-int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
+int64_t recv_image_async(struct roperation *op)
 {
-	struct rbuf *curr_buf = NULL;
+	int fd = op->fd;
+	struct rimage *rimg = op->rimg;
+	uint64_t size = op->size;
+	int flags = op->flags;
+	bool close_fd = op->close_fd;
+	struct rbuf *curr_buf = op->curr_recv_buf;
 	int n;
 
-	if (flags == O_APPEND)
-		curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
-	else
-		curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	if (curr_buf == NULL) {
+		if (flags == O_APPEND)
+			curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+		else
+			curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	}
 
 	while (1) {
 		n = read(fd,
@@ -712,6 +733,8 @@ int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool c
 					close(fd);
 				return rimg->size;
 			}
+		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+			return errno;
 		} else {
 			pr_perror("Read on %s:%s socket failed",
 				rimg->path, rimg->snapshot_id);
@@ -724,37 +747,59 @@ int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool c
 
 int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
 {
+	int ret;
+	struct roperation *op = malloc(sizeof(struct roperation));
+	bzero(op, sizeof(struct roperation));
+	op->fd = fd;
+	op->rimg = rimg;
+	op->flags = flags;
+	op->close_fd = close_fd;
+	op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	while ((ret = send_image_async(op)) < 0)
+		if (ret != EAGAIN && ret != EWOULDBLOCK)
+			return -1;
+	return ret;
+}
 
-	int n, nblocks = 0;
+int64_t send_image_async(struct roperation *op)
+{
+	int fd = op->fd;
+	struct rimage *rimg = op->rimg;
+	int flags = op->flags;
+	bool close_fd = op->close_fd;
+	int n;
 
 	if (flags != O_APPEND) {
-		rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
-		rimg->curr_sent_bytes = 0;
+		op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+		op->curr_sent_bytes = 0;
 	}
 
 	while (1) {
 		n = send(
 		    fd,
-		    rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
-		    min(BUF_SIZE, rimg->curr_sent_buf->nbytes) - rimg->curr_sent_bytes,
+		    op->curr_sent_buf->buffer + op->curr_sent_bytes,
+		    min(BUF_SIZE, op->curr_sent_buf->nbytes) - op->curr_sent_bytes,
 		    MSG_NOSIGNAL);
 		if (n > -1) {
-			rimg->curr_sent_bytes += n;
-			if (rimg->curr_sent_bytes == BUF_SIZE) {
-				rimg->curr_sent_buf =
-				    list_entry(rimg->curr_sent_buf->l.next, struct rbuf, l);
-				nblocks++;
-				rimg->curr_sent_bytes = 0;
-			} else if (rimg->curr_sent_bytes == rimg->curr_sent_buf->nbytes) {
+			op->curr_sent_bytes += n;
+			if (op->curr_sent_bytes == BUF_SIZE) {
+				op->curr_sent_buf =
+				    list_entry(op->curr_sent_buf->l.next, struct rbuf, l);
+				op->nblocks++;
+				op->curr_sent_bytes = 0;
+			} else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
 				if (close_fd)
 					close(fd);
-				return nblocks*BUF_SIZE + rimg->curr_sent_buf->nbytes;
+				return op->nblocks*BUF_SIZE + op->curr_sent_buf->nbytes;
 			}
 		} else if (errno == EPIPE || errno == ECONNRESET) {
 			pr_warn("Connection for %s:%s was closed early than expected\n",
 				rimg->path, rimg->snapshot_id);
 			return 0;
-		} else {
+		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+			return errno;
+		}
+		else {
 			pr_perror("Write on %s:%s socket failed",
 				rimg->path, rimg->snapshot_id);
 			return -1;
diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
index 1771d310..0947e7f0 100644
--- a/criu/include/img-remote.h
+++ b/criu/include/img-remote.h
@@ -36,10 +36,6 @@ struct rimage {
 	char snapshot_id[PATHLEN];
 	struct list_head l;
 	struct list_head buf_head;
-	/* Used to track already sent buffers when the image is appended. */
-	struct rbuf *curr_sent_buf;
-	/* Similar to the previous field. Number of bytes sent in 'curr_sent_buf'. */
-	int curr_sent_bytes;
 	uint64_t size; /* number of bytes */
 	pthread_mutex_t in_use; /* Only one operation at a time, per image. */
 };
@@ -57,6 +53,28 @@ struct wthread {
 	sem_t wakeup_sem;
 };
 
+/* Structure that describes the state of a remote operation on remote images. */
+struct roperation {
+	/* File descriptor being used. */
+	int fd;
+	/* Remote image being used. */
+	struct rimage *rimg;
+	/* Flags for the operation. */
+	int flags;
+	/* If fd should be closed when the operation is done. */
+	bool close_fd;
+	/* Note: recv operation only. How much bytes should be received. */
+	uint64_t size;
+	/* Note: recv operation only. Buffer being writen. */
+	struct rbuf *curr_recv_buf;
+	/* Note: send operation only. Number of blocks already sent. */
+	int nblocks;
+	/* Note: send operation only. Pointer to buffer being sent. */
+	struct rbuf *curr_sent_buf;
+	/* Note: send operation only. Number of bytes sent in 'curr_send_buf. */
+	uint64_t curr_sent_bytes;
+};
+
 /* This variable is used to indicate when the dump is finished. */
 extern bool finished;
 /* This is the proxy to cache TCP socket FD. */
@@ -80,7 +98,9 @@ void *accept_remote_image_connections(void *ptr);
 
 int64_t forward_image(struct rimage *rimg);
 int64_t send_image(int fd, struct rimage *rimg, int flags, bool image_check);
+int64_t send_image_async(struct roperation *op);
 int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool image_check);
+int64_t recv_image_async(struct roperation *op);
 
 int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *open_mode, uint64_t *size);
 int64_t write_remote_header(int fd, char *snapshot_id, char *path, int open_mode, uint64_t size);
-- 
2.17.0



More information about the CRIU mailing list