[CRIU] [PATCH 3/5] Unblocking implementation of img cache and proxy seems to be working.

rodrigo-bruno rbruno at gsd.inesc-id.pt
Mon May 14 03:29:49 MSK 2018


From: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>

---
 criu/img-cache.c          | 137 +-----
 criu/img-proxy.c          |  65 +--
 criu/img-remote.c         | 961 +++++++++++++++++++++++---------------
 criu/include/img-remote.h |  53 +--
 4 files changed, 629 insertions(+), 587 deletions(-)

diff --git a/criu/img-cache.c b/criu/img-cache.c
index 7b828b9b..c941f14e 100644
--- a/criu/img-cache.c
+++ b/criu/img-cache.c
@@ -8,105 +8,25 @@
 #include <fcntl.h>
 #include "cr_options.h"
 
-static struct rimage *wait_for_image(struct wthread *wt)
+int accept_proxy_to_cache(int sockfd)
 {
-	struct rimage *result;
+    struct sockaddr_in cli_addr;
+    socklen_t clilen = sizeof(cli_addr);
+    int proxy_fd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
 
-	if (!strncmp(wt->path, RESTORE_FINISH, sizeof(RESTORE_FINISH))) {
-		finished = true;
-		shutdown(local_req_fd, SHUT_RD);
-		return NULL;
-	}
-
-	result = get_rimg_by_name(wt->snapshot_id, wt->path);
-	if (result != NULL && result->size > 0)
-		return result;
-
-	/* The file does not exist and we do not expect new files */
-	if (finished && !is_receiving())
-		return NULL;
-
-	/* NOTE: at this point, when the thread wakes up, either the image is
-	 * already in memory or it will never come (the dump is finished).
-	 */
-	sem_wait(&(wt->wakeup_sem));
-	result = get_rimg_by_name(wt->snapshot_id, wt->path);
-	if (result != NULL && result->size > 0)
-		return result;
-	else
-		return NULL;
-}
-
-/* The image cache creates a thread that calls this function. It waits for remote
- * images from the image-cache.
- */
-void *accept_remote_image_connections(void *port)
-{
-	int fd = *((int *) port);
-	struct sockaddr_in cli_addr;
-	socklen_t clilen = sizeof(cli_addr);
-	char snapshot_id_buf[PATHLEN], path_buf[PATHLEN];
-	uint64_t size;
-	int64_t ret;
-	int flags, proxy_fd;
-	struct rimage *rimg;
-
-	proxy_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
-	if (proxy_fd < 0) {
-		pr_perror("Unable to accept remote image connection from image proxy");
-		return NULL;
-	}
-	while (1) {
-		ret = read_remote_header(proxy_fd, snapshot_id_buf, path_buf, &flags, &size);
-		if (ret < 0) {
-			pr_perror("Unable to receive remote header from image proxy");
-			return NULL;
-		}
-		/* This means that the no more images are coming. */
-		else if (!ret) {
-			pr_info("Image Proxy connection closed.\n");
-			finished = true;
-			unlock_workers();
-			return NULL;
-		}
-
-		pr_info("Received %s request for %s:%s\n",
-			flags == O_RDONLY ? "read" :
-				flags == O_APPEND ? "append" : "write",
-			path_buf, snapshot_id_buf);
+    if (proxy_fd < 0) {
+        pr_perror("Unable to accept remote image connection from image proxy");
+        return -1;
+    }
 
-		rimg = prepare_remote_image(path_buf, snapshot_id_buf, flags);
-
-		prepare_recv_rimg();
-		if (!size)
-			ret = 0;
-		else
-			ret = recv_image(proxy_fd, rimg, size, flags, false);
-		if (ret < 0) {
-			pr_perror("Unable to receive %s:%s from image proxy",
-				rimg->path, rimg->snapshot_id);
-			finalize_recv_rimg(NULL);
-			return NULL;
-		} else if (ret != size) {
-			pr_perror("Unable to receive %s:%s from image proxy (received %ld bytes, expected %lu bytes)",
-				rimg->path, rimg->snapshot_id, (long)ret, (unsigned long)size);
-			finalize_recv_rimg(NULL);
-			return NULL;
-		}
-		finalize_recv_rimg(rimg);
-
-		pr_info("Finished receiving %s:%s (received %ld bytes)\n",
-			rimg->path, rimg->snapshot_id, (long)ret);
-	}
+    return proxy_fd;
 }
 
 int image_cache(bool background, char *local_cache_path, unsigned short cache_write_port)
 {
-	pthread_t local_req_thr, remote_req_thr;
-
 	pr_info("Proxy to Cache Port %d, CRIU to Cache Path %s\n",
 			cache_write_port, local_cache_path);
-
+    restoring = true;
 
 	if (opts.ps_socket != -1) {
 		proxy_to_cache_fd = opts.ps_socket;
@@ -117,40 +37,29 @@ int image_cache(bool background, char *local_cache_path, unsigned short cache_wr
 			pr_perror("Unable to open proxy to cache TCP socket");
 			return -1;
 		}
+        // Wait to accept connection from proxy.
+        proxy_to_cache_fd = accept_proxy_to_cache(proxy_to_cache_fd);
+        if (proxy_to_cache_fd < 0)
+            return -1; // TODO - should close other sockets.
 	}
 
+    pr_info("Cache is connected to Proxy through fd %d\n", proxy_to_cache_fd);
+
 	local_req_fd = setup_UNIX_server_socket(local_cache_path);
 	if (local_req_fd < 0) {
 		pr_perror("Unable to open cache to proxy UNIX socket");
-		return -1;
-	}
-
-	socket_set_non_blocking(local_req_fd);
+		return -1; // TODO - should close other sockets.
 
-	if (init_daemon(background, wait_for_image)) {
-		pr_perror("Unable to initialize daemon");
-		return -1;
 	}
 
-	if (pthread_create(
-		&remote_req_thr,
-		NULL, accept_remote_image_connections,
-		(void *) &proxy_to_cache_fd)) {
-		pr_perror("Unable to create remote requests thread");
-		return -1;
-	}
-	if (pthread_create(
-		&local_req_thr,
-		NULL,
-		accept_local_image_connections,
-		(void *) &local_req_fd)) {
-		pr_perror("Unable to create local requests thread");
-		return -1;
+    if (background) {
+		if (daemon(1, 0) == -1) {
+			pr_perror("Can't run service server in the background");
+			return -1;
+		}
 	}
 
-	pthread_join(remote_req_thr, NULL);
-	pthread_join(local_req_thr, NULL);
-	join_workers();
+    accept_image_connections();
 	pr_info("Finished image cache.");
 	return 0;
 }
diff --git a/criu/img-proxy.c b/criu/img-proxy.c
index b63d69a0..9551a7dc 100644
--- a/criu/img-proxy.c
+++ b/criu/img-proxy.c
@@ -8,51 +8,11 @@
 #include <sys/socket.h>
 #include "cr_options.h"
 
-static struct rimage *wait_for_image(struct wthread *wt)
-{
-	return get_rimg_by_name(wt->snapshot_id, wt->path);
-}
-
-int64_t forward_image(struct rimage *rimg)
-{
-	int64_t ret;
-	int fd = proxy_to_cache_fd;
-
-	pthread_mutex_lock(&(rimg->in_use));
-	pr_info("Forwarding %s:%s (%lu bytes)\n",
-	    rimg->path, rimg->snapshot_id, (unsigned long)rimg->size);
-	if (write_remote_header(
-		fd, rimg->snapshot_id, rimg->path, O_APPEND, rimg->size) < 0) {
-		pr_perror("Error writing header for %s:%s",
-			rimg->path, rimg->snapshot_id);
-		pthread_mutex_unlock(&(rimg->in_use));
-		return -1;
-	}
-
-	ret = send_image(fd, rimg, O_APPEND, false);
-	if (ret < 0) {
-		pr_perror("Unable to send %s:%s to image cache",
-			rimg->path, rimg->snapshot_id);
-		pthread_mutex_unlock(&(rimg->in_use));
-		return -1;
-	} else if (ret != rimg->size) {
-		pr_perror("Unable to send %s:%s to image proxy (sent %ld bytes, expected %lu bytes",
-		    rimg->path, rimg->snapshot_id, (long)ret, (unsigned long)rimg->size);
-		pthread_mutex_unlock(&(rimg->in_use));
-		return -1;
-	}
-	pr_info("Finished forwarding %s:%s (sent %lu bytes)\n",
-	    rimg->path, rimg->snapshot_id, (unsigned long)rimg->size);
-	pthread_mutex_unlock(&(rimg->in_use));
-	return ret;
-}
-
 int image_proxy(bool background, char *local_proxy_path, char *fwd_host, unsigned short fwd_port)
 {
-	pthread_t local_req_thr;
-
 	pr_info("CRIU to Proxy Path: %s, Cache Address %s:%hu\n",
 		local_proxy_path, fwd_host, fwd_port);
+    restoring = false;
 
 	local_req_fd = setup_UNIX_server_socket(local_proxy_path);
 	if (local_req_fd < 0) {
@@ -60,8 +20,6 @@ int image_proxy(bool background, char *local_proxy_path, char *fwd_host, unsigne
 		return -1;
 	}
 
-	socket_set_non_blocking(local_req_fd);
-
 	if (opts.ps_socket != -1) {
 		proxy_to_cache_fd = opts.ps_socket;
 		pr_info("Re-using ps socket %d\n", proxy_to_cache_fd);
@@ -69,24 +27,21 @@ int image_proxy(bool background, char *local_proxy_path, char *fwd_host, unsigne
 		proxy_to_cache_fd = setup_TCP_client_socket(fwd_host, fwd_port);
 		if (proxy_to_cache_fd < 0) {
 			pr_perror("Unable to open proxy to cache TCP socket");
-			return -1;
+			return -1; // TODO - should close other sockets.
 		}
 	}
 
-	if (init_daemon(background, wait_for_image))
-		return -1;
+    pr_info("Proxy is connected to Cache through fd %d\n", proxy_to_cache_fd);
 
-	if (pthread_create(
-	    &local_req_thr,
-	    NULL,
-	    accept_local_image_connections,
-	    (void *) &local_req_fd)) {
-		pr_perror("Unable to create local requests thread");
-		return -1;
+    if (background) {
+		if (daemon(1, 0) == -1) {
+			pr_perror("Can't run service server in the background");
+			return -1;
+		}
 	}
 
-	pthread_join(local_req_thr, NULL);
-	join_workers();
+    // TODO - local_req_fd and proxy_to_cache_fd send as args.
+    accept_image_connections();
 	pr_info("Finished image proxy.");
 	return 0;
 }
diff --git a/criu/img-remote.c b/criu/img-remote.c
index 4c566450..ec69bc02 100644
--- a/criu/img-remote.c
+++ b/criu/img-remote.c
@@ -27,29 +27,35 @@
 #define PB_LOCAL_IMAGE_SIZE PATHLEN
 #define EPOLL_MAX_EVENTS 50
 
-static char *snapshot_id;
-bool restoring = true;
-
+// List of images already in memory.
 LIST_HEAD(rimg_head);
-pthread_mutex_t rimg_lock = PTHREAD_MUTEX_INITIALIZER;
 
-pthread_mutex_t proxy_to_cache_lock = PTHREAD_MUTEX_INITIALIZER;
+// List of local operations currently in-progess.
+LIST_HEAD(rop_inprogress);
 
-LIST_HEAD(workers_head);
-pthread_mutex_t workers_lock = PTHREAD_MUTEX_INITIALIZER;
-sem_t workers_semph;
+// List of local operations pending (reads on the restore side for images that
+// still haven't arrived).
 
-struct rimage * (*wait_for_image) (struct wthread *wt);
+LIST_HEAD(rop_pending);
+// List of images waiting to be forwarded. The head of the list is currently
+// being forwarded.
+LIST_HEAD(rop_forwarding);
+
+// List of snapshots (useful when doing incremental restores/dumps
+LIST_HEAD(snapshot_head);
 
-bool finished = false;
-int writing = 0;
-int forwarding = 0;
+static char *snapshot_id;
+bool restoring = true; // TODO - check where this is used!
+// TODO - split this into two vars, recv_from_proxy, send_to_cache
+bool forwarding = false; // TODO - true if proxy_to_cache_fd is being used.
+bool finished_local = false;
+bool finished_remote = false;
 int proxy_to_cache_fd;
 int local_req_fd;
+int epoll_fd;
+struct epoll_event *events;
 
 
-LIST_HEAD(snapshot_head);
-
 /* A snapshot is a dump or pre-dump operation. Each snapshot is identified by an
  * ID which corresponds to the working directory specefied by the user.
  */
@@ -79,125 +85,27 @@ struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path)
 {
 	struct rimage *rimg = NULL;
 
-	pthread_mutex_lock(&rimg_lock);
 	list_for_each_entry(rimg, &rimg_head, l) {
 		if (!strncmp(rimg->path, path, PATHLEN) &&
 		    !strncmp(rimg->snapshot_id, snapshot_id, PATHLEN)) {
-			pthread_mutex_unlock(&rimg_lock);
 			return rimg;
 		}
 	}
-	pthread_mutex_unlock(&rimg_lock);
-	return NULL;
-}
-
-static struct wthread *get_wt_by_name(const char *snapshot_id, const char *path)
-{
-	struct wthread *wt = NULL;
-
-	pthread_mutex_lock(&workers_lock);
-	list_for_each_entry(wt, &workers_head, l) {
-		if (!strncmp(wt->path, path, PATHLEN) &&
-		   !strncmp(wt->snapshot_id, snapshot_id, PATHLEN)) {
-			pthread_mutex_unlock(&workers_lock);
-			return wt;
-		}
-	}
-	pthread_mutex_unlock(&workers_lock);
 	return NULL;
 }
 
-static int init_sync_structures(void)
-{
-	if (sem_init(&workers_semph, 0, 0) != 0) {
-		pr_perror("Workers semaphore init failed");
-		return -1;
-	}
-
-	return 0;
-}
-
-void prepare_recv_rimg(void)
-{
-	pthread_mutex_lock(&rimg_lock);
-	writing++;
-	pthread_mutex_unlock(&rimg_lock);
-}
-
-void finalize_recv_rimg(struct rimage *rimg)
-{
-
-	pthread_mutex_lock(&rimg_lock);
-
-	if (rimg)
-		list_add_tail(&(rimg->l), &rimg_head);
-	writing--;
-	pthread_mutex_unlock(&rimg_lock);
-	/* Wake thread waiting for this image. */
-	if (rimg) {
-		struct wthread *wt = get_wt_by_name(rimg->snapshot_id, rimg->path);
-		if (wt)
-			sem_post(&(wt->wakeup_sem));
-	}
-}
-
-bool is_receiving(void)
-{
-	int ret;
-
-	pthread_mutex_lock(&rimg_lock);
-	ret = writing;
-	pthread_mutex_unlock(&rimg_lock);
-	return ret > 0;
-}
-
-static void prepare_fwd_rimg(void)
-{
-	pthread_mutex_lock(&rimg_lock);
-	forwarding++;
-	pthread_mutex_unlock(&rimg_lock);
-}
-
-static void finalize_fwd_rimg(void)
-{
-	pthread_mutex_lock(&rimg_lock);
-	forwarding--;
-	pthread_mutex_unlock(&rimg_lock);
-}
-
-static bool is_forwarding(void)
-{
-	int ret;
-
-	pthread_mutex_lock(&rimg_lock);
-	ret = forwarding;
-	pthread_mutex_unlock(&rimg_lock);
-	return ret > 0;
-}
-
-/* This function is called when no more images are coming. Threads still waiting
- * for images will be awaken to send a ENOENT (no such file) to the requester.
- */
-void unlock_workers(void)
+struct roperation *get_rop_by_name(
+    struct list_head *head, const char *snapshot_id, const char *path)
 {
-	struct wthread *wt = NULL;
-
-	pthread_mutex_lock(&workers_lock);
-	list_for_each_entry(wt, &workers_head, l)
-		sem_post(&(wt->wakeup_sem));
-	pthread_mutex_unlock(&workers_lock);
-}
+	struct roperation *rop = NULL;
 
-int init_daemon(bool background, struct rimage *(*wfi)(struct wthread*))
-{
-	if (background) {
-		if (daemon(1, 0) == -1) {
-			pr_perror("Can't run service server in the background");
-			return -1;
+	list_for_each_entry(rop, head, l) {
+		if (!strncmp(rop->path, path, PATHLEN) &&
+		    !strncmp(rop->snapshot_id, snapshot_id, PATHLEN)) {
+			return rop;
 		}
 	}
-	wait_for_image = wfi;
-	return init_sync_structures();
+	return NULL;
 }
 
 int setup_TCP_server_socket(int port)
@@ -278,11 +186,15 @@ err:
 
 int event_set(int epoll_fd, int op, int fd, uint32_t events, void *data)
 {
+	int ret;
 	struct epoll_event event;
 	event.events = events;
 	event.data.ptr = data;
-	// TODO - check if this is okay to send a stack allocated object!
-	return epoll_ctl(epoll_fd, op, fd, &event);
+
+	ret = epoll_ctl(epoll_fd, op, fd, &event);
+	if (ret)
+		pr_perror("[fd=%d] Unable to set event", fd);
+	return ret;
 }
 
 void socket_set_non_blocking(int fd)
@@ -299,6 +211,20 @@ void socket_set_non_blocking(int fd)
 		pr_perror("Failed to set flags for fd %d", fd);
 }
 
+void socket_set_blocking(int fd)
+{
+	int flags = fcntl(fd, F_GETFL, NULL);
+
+	if (flags < 0) {
+		pr_perror("Failed to obtain flags from fd %d", fd);
+		return;
+        }
+	flags &= (~O_NONBLOCK);
+
+	if (fcntl(fd, F_SETFL, flags) < 0)
+		pr_perror("Failed to set flags for fd %d", fd);
+}
+
 int setup_UNIX_server_socket(char *path)
 {
 	struct sockaddr_un addr;
@@ -441,48 +367,10 @@ int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *flags, ui
 	return ret;
 }
 
-static struct wthread *new_worker(void)
-{
-	struct wthread *wt = malloc(sizeof(struct wthread));
-
-	if (!wt) {
-		pr_perror("Unable to allocate worker thread structure");
-		goto err;
-	}
-	if (sem_init(&(wt->wakeup_sem), 0, 0) != 0) {
-		pr_perror("Workers semaphore init failed");
-		goto err;
-	}
-	return wt;
-err:
-	free(wt);
-	return NULL;
-}
-
-static void add_worker(struct wthread *wt)
-{
-	pthread_mutex_lock(&workers_lock);
-	list_add_tail(&(wt->l), &workers_head);
-	pthread_mutex_unlock(&workers_lock);
-	sem_post(&workers_semph);
-}
-
-void join_workers(void)
-{
-	struct wthread *wthread = NULL;
-
-	while (! list_empty(&workers_head)) {
-		wthread = list_entry(workers_head.next, struct wthread, l);
-		pthread_join(wthread->tid, NULL);
-		list_del(&(wthread->l));
-		free(wthread);
-	}
-}
-
 static struct rimage *new_remote_image(char *path, char *snapshot_id)
 {
-	struct rimage *rimg = malloc(sizeof(struct rimage));
-	struct rbuf *buf = malloc(sizeof(struct rbuf));
+	struct rimage *rimg = calloc(1, sizeof(struct rimage));
+	struct rbuf *buf = calloc(1, sizeof(struct rbuf));
 
 	if (rimg == NULL || buf == NULL) {
 		pr_perror("Unable to allocate remote image structures");
@@ -490,18 +378,13 @@ static struct rimage *new_remote_image(char *path, char *snapshot_id)
 	}
 
 	strncpy(rimg->path, path, PATHLEN -1 );
-	rimg->path[PATHLEN - 1] = '\0';
 	strncpy(rimg->snapshot_id, snapshot_id, PATHLEN - 1);
+	rimg->path[PATHLEN - 1] = '\0';
 	rimg->snapshot_id[PATHLEN - 1] = '\0';
-	rimg->size = 0;
-	buf->nbytes = 0;
 	INIT_LIST_HEAD(&(rimg->buf_head));
 	list_add_tail(&(buf->l), &(rimg->buf_head));
+	rimg->curr_fwd_buf = buf;
 
-	if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
-		pr_err("Remote image in_use mutex init failed\n");
-		goto err;
-	}
 	return rimg;
 err:
 	free(rimg);
@@ -509,11 +392,56 @@ err:
 	return NULL;
 }
 
+static struct roperation *new_remote_operation(
+    char *path, char *snapshot_id, int cli_fd, int flags, bool close_fd)
+{
+    struct roperation *rop = calloc(1, sizeof(struct roperation));
+
+    if (rop == NULL) {
+        pr_perror("Unable to allocate remote operation structures");
+		return NULL;
+    }
+    strncpy(rop->path, path, PATHLEN -1 );
+	strncpy(rop->snapshot_id, snapshot_id, PATHLEN - 1);
+	rop->path[PATHLEN - 1] = '\0';
+	rop->snapshot_id[PATHLEN - 1] = '\0';
+    rop->fd = cli_fd;
+    rop->flags = flags;
+    rop->close_fd = close_fd;
+
+    return rop;
+}
+
+static void rop_set_rimg(struct roperation* rop, struct rimage* rimg)
+{
+    rop->rimg = rimg;
+    rop->size = rimg->size;
+	if (rop->flags == O_APPEND) {
+		// Image forward on append must start where the last fwd finished.
+		if (rop->fd == proxy_to_cache_fd) {
+			rop->curr_sent_buf = rimg->curr_fwd_buf;
+			rop->curr_sent_bytes = rimg->curr_fwd_bytes;
+		}
+		// For local appends, just write at the end.
+		else {
+			rop->curr_sent_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+			rop->curr_sent_bytes = rop->curr_sent_buf->nbytes;
+		}
+		// On the receiver size, we just append
+		rop->curr_recv_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+	}
+	else {
+		// Writes or reads are simple. Just do it from the beginnig.
+		rop->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+		rop->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+		rop->curr_sent_bytes = 0;
+
+	}
+}
+
 /* Clears a remote image struct for reusing it. */
 static struct rimage *clear_remote_image(struct rimage *rimg)
 {
-	pthread_mutex_lock(&(rimg->in_use));
-
 	while (!list_is_singular(&(rimg->buf_head))) {
 		struct rbuf *buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
 
@@ -524,179 +452,428 @@ static struct rimage *clear_remote_image(struct rimage *rimg)
 	list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
 	rimg->size = 0;
 
-	pthread_mutex_unlock(&(rimg->in_use));
-
 	return rimg;
 }
 
-struct rimage *prepare_remote_image(char *path, char *snapshot_id, int open_mode)
+void handle_accept_write(
+    int cli_fd, char* snapshot_id, char* path, int flags, bool close_fd, uint64_t size)
 {
+    struct roperation *rop = NULL;
 	struct rimage *rimg = get_rimg_by_name(snapshot_id, path);
-	/* There is no record of such image, create a new one. */
 
-	if (rimg == NULL)
-		return new_remote_image(path, snapshot_id);
-
-	pthread_mutex_lock(&rimg_lock);
-	list_del(&(rimg->l));
-	pthread_mutex_unlock(&rimg_lock);
+	if (rimg == NULL) {
+		rimg = new_remote_image(path, snapshot_id);
+        if (rimg == NULL) {
+            pr_perror("Error preparing remote image");
+            goto err;
+        }
+    }
+    else {
+        list_del(&(rimg->l));
+        if (flags == O_APPEND)
+		    clear_remote_image(rimg);
+    }
+
+    rop = new_remote_operation(path, snapshot_id, cli_fd, flags, close_fd);
+    if (rop == NULL) {
+	    pr_perror("Error preparing remote operation");
+        goto err;
+    }
+
+    rop_set_rimg(rop, rimg);
+	rop->size = size;
+    list_add_tail(&(rop->l), &rop_inprogress);
+	event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLIN, rop);
+	return;
+err:
+    free(rimg);
+    free(rop);
+}
 
-	/* There is already an image record. Simply return it for appending. */
-	if (open_mode == O_APPEND)
-		return rimg;
-	/* There is already an image record. Clear it for writing. */
-	else
-		return clear_remote_image(rimg);
+void handle_accept_proxy_write(
+	int cli_fd, char* snapshot_id, char* path, int flags)
+{
+	handle_accept_write(cli_fd, snapshot_id, path, flags, true, 0);
 }
 
-static void *process_local_read(struct wthread *wt)
+void handle_accept_proxy_read(
+    int cli_fd, char* snapshot_id, char* path, int flags)
 {
+    struct roperation *rop = NULL;
 	struct rimage *rimg = NULL;
-	int64_t ret;
-	/* TODO - split wait_for_image
-	 * in cache - improve the parent stuf
-	 * in proxy - do not wait for anything, return no file
-	 */
-	rimg = wait_for_image(wt);
-	if (!rimg) {
-		pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
-		if (write_reply_header(wt->fd, ENOENT) < 0)
+
+    rimg = get_rimg_by_name(snapshot_id, path);
+
+    // Check if we already have the image.
+	if (rimg == NULL) {
+		pr_info("No image %s:%s.\n", path, snapshot_id);
+		if (write_reply_header(cli_fd, ENOENT) < 0) {
 			pr_perror("Error writing reply header for unexisting image");
-		close(wt->fd);
-		return NULL;
-	} else {
-		if (write_reply_header(wt->fd, 0) < 0) {
+		    goto err;
+        }
+	}
+	else {
+		if (write_reply_header(cli_fd, 0) < 0) {
 			pr_perror("Error writing reply header for %s:%s",
-					wt->path, wt->snapshot_id);
-			close(wt->fd);
-			return NULL;
+				path, snapshot_id);
+			goto err;
 		}
-	}
 
-	pthread_mutex_lock(&(rimg->in_use));
-	ret = send_image(wt->fd, rimg, wt->flags, true);
-	if (ret < 0)
-		pr_perror("Unable to send %s:%s to CRIU (sent %ld bytes)",
-				rimg->path, rimg->snapshot_id, (long)ret);
-	else
-		pr_info("Finished sending %s:%s to CRIU (sent %ld bytes)\n",
-				rimg->path, rimg->snapshot_id, (long)ret);
-	pthread_mutex_unlock(&(rimg->in_use));
-	return NULL;
+        rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
+        if (rop == NULL) {
+			pr_perror("Error preparing remote operation");
+            goto err;
+        }
+        rop_set_rimg(rop, rimg);
+        list_add_tail(&(rop->l), &rop_inprogress);
+        event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
+	}
+	return;
+err:
+    close(cli_fd);
 }
 
-static void *process_local_image_connection(void *ptr)
+void finish_local()
 {
-	struct wthread *wt = (struct wthread *) ptr;
-	struct rimage *rimg = NULL;
-	int64_t ret;
+	int ret;
+	finished_local = true;
+	//shutdown(local_req_fd, SHUT_RD); //TODO - should this be removed?
+	ret = event_set(epoll_fd, EPOLL_CTL_DEL, local_req_fd, 0, 0);
+	if (ret) {
+		pr_perror("Failed to del local fd from epoll");
+	}
+}
 
-	/* NOTE: the code inside this if is shared for both cache and proxy. */
-	if (wt->flags == O_RDONLY)
-		return process_local_read(wt);
+void handle_accept_cache_read(
+    int cli_fd, char* snapshot_id, char* path, int flags)
+{
+    struct rimage     *rimg = NULL;
+    struct roperation *rop   = NULL;
 
-	/* NOTE: IMAGE PROXY ONLY. The image cache receives write connections
-	 * through TCP (see accept_remote_image_connections).
-	 */
-	rimg = prepare_remote_image(wt->path, wt->snapshot_id, wt->flags);
-	ret = recv_image(wt->fd, rimg, 0, wt->flags, true);
-	if (ret < 0) {
-		pr_perror("Unable to receive %s:%s to CRIU (received %ld bytes)",
-				rimg->path, rimg->snapshot_id, (long)ret);
-		finalize_recv_rimg(NULL);
-		return NULL;
+	// Check if this is the restore finish message.
+	if (!strncmp(path, RESTORE_FINISH, sizeof(RESTORE_FINISH))) {
+		close(cli_fd);
+		finish_local();
+		return;
 	}
-	finalize_recv_rimg(rimg);
-	pr_info("Finished receiving %s:%s (received %ld bytes)\n",
-			rimg->path, rimg->snapshot_id, (long)ret);
 
+    rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
+    if (rop == NULL) {
+	    pr_perror("Error preparing remote operation");
+        close(cli_fd);
+		return;
+    }
 
-	if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
-		finished = true;
-		shutdown(local_req_fd, SHUT_RD);
-	} else {
-		pthread_mutex_lock(&proxy_to_cache_lock);
-		ret = forward_image(rimg);
-		pthread_mutex_unlock(&proxy_to_cache_lock);
+	// Check if we already have the image.
+	rimg = get_rimg_by_name(snapshot_id, path);
+	if (rimg != NULL && rimg->size > 0) {
+		if (write_reply_header(cli_fd, 0) < 0) {
+			pr_perror("Error writing reply header for %s:%s",
+				path, snapshot_id);
+            free(rop);
+			close(rop->fd);
+		}
+        rop_set_rimg(rop, rimg);
+		list_add_tail(&(rop->l), &rop_inprogress);
+        event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
 	}
+	// The file may exist in future.
+	else if (!finished_remote){
+		list_add_tail(&(rop->l), &rop_pending);
+	}
+	// The file does not exist.
+	else {
+		pr_info("No image %s:%s.\n", path, snapshot_id);
+		if (write_reply_header(cli_fd, ENOENT) < 0)
+			pr_perror("Error writing reply header for unexisting image");
+        free(rop);
+		close(cli_fd);
+	}
+}
 
-	finalize_fwd_rimg();
-	if (ret < 0) {
-		pr_perror("Unable to forward %s:%s to Image Cache",
-				rimg->path, rimg->snapshot_id);
+void forward_remote_image(struct roperation* rop)
+{
+	uint64_t ret = 0;
+    // Set blocking during the setup.
+//    socket_set_blocking(rop->fd); // TODO - test
 
-		return NULL;
+	ret = write_remote_header(
+		rop->fd, rop->snapshot_id, rop->path, rop->flags, rop->size);
+
+	if (ret < 0) {
+		pr_perror("Error writing header for %s:%s",
+			rop->path, rop->snapshot_id);
+		return;
 	}
 
-	if (finished && !is_forwarding() && !is_receiving()) {
-		pr_info("Closing connection to Image Cache.\n");
-		close(proxy_to_cache_fd);
-		unlock_workers();
+	pr_info("[fd=%d] Fowarding %s request for %s:%s (%lu bytes\n",
+		rop->fd,
+		rop->flags == O_RDONLY ? "read" :
+		    rop->flags == O_APPEND ? "append" : "write",
+		rop->path, rop->snapshot_id, rop->size);
+
+
+    // Go back to non-blocking
+//    socket_set_non_blocking(rop->fd); // TODO - test
+
+	forwarding = true;
+    event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
+}
+
+void handle_remote_accept(int fd)
+{
+    char path[PATHLEN];
+	char snapshot_id[PATHLEN];
+	int flags;
+    uint64_t size = 0;
+    uint64_t ret;
+
+    // Set blocking during the setup.
+//    socket_set_blocking(fd); // TODO - test!
+
+    ret = read_remote_header(fd, snapshot_id, path, &flags, &size);
+	if (ret < 0) {
+		pr_perror("Unable to receive remote header from image proxy");
+		goto err;
 	}
-	return NULL;
+	/* This means that the no more images are coming. */
+	else if (!ret) {
+		finished_remote = true;
+		pr_info("Image Proxy connection closed.\n");
+		return;
+	}
+
+    // Go back to non-blocking
+//    socket_set_non_blocking(fd); // TODO - test!
+
+	pr_info("[fd=%d] Received %s request for %s:%s with %lu bytes\n",
+		fd,
+		flags == O_RDONLY ? "read" :
+		    flags == O_APPEND ? "append" : "write",
+		path, snapshot_id, size);
+
+
+	forwarding = true;
+    handle_accept_write(fd, snapshot_id, path, flags, false, size);
+    return;
+err:
+    close(fd);
 }
 
 void handle_local_accept(int fd)
 {
-	struct wthread *wt = NULL;
 	int cli_fd;
-	pthread_t tid;
+    char path[PATHLEN];
+	char snapshot_id[PATHLEN];
+    int flags = 0;
 	struct sockaddr_in cli_addr;
 	socklen_t clilen = sizeof(cli_addr);
 
 	cli_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
 	if (cli_fd < 0) {
 		pr_perror("Unable to accept local image connection");
-		goto err;
+		return;
 	}
 
-	wt = new_worker();
-	wt->fd = cli_fd;
-
-	if (read_header(wt->fd, wt->snapshot_id, wt->path, &(wt->flags)) < 0) {
+	if (read_header(cli_fd, snapshot_id, path, &flags) < 0) {
 		pr_err("Error reading local image header\n");
 		goto err;
 	}
 
-	/* These function calls are used to avoid other threads from
-	 * thinking that there are no more images are coming.
-	 */
-	if (wt->flags != O_RDONLY) {
-		prepare_recv_rimg();
-		prepare_fwd_rimg();
+	pr_info("[fd=%d] Received %s request for %s:%s\n",
+			cli_fd,
+			flags == O_RDONLY ? "read" :
+			flags == O_APPEND ? "append" : "write",
+			path, snapshot_id);
+
+	// Write/Append case (only possible in img-proxy).
+	if (flags != O_RDONLY) {
+        handle_accept_proxy_write(cli_fd, snapshot_id, path, flags);
+	}
+	// Read case while restoring (img-cache).
+	else if (restoring) {
+        handle_accept_cache_read(cli_fd, snapshot_id, path, flags);
+	}
+	// Read case while dumping (img-proxy).
+	else {
+        handle_accept_proxy_read(cli_fd, snapshot_id, path, flags);
 	}
 
-	pr_info("Received %s request for %s:%s\n",
-			wt->flags == O_RDONLY ? "read" :
-			wt->flags == O_APPEND ? "append" : "write",
-			wt->path, wt->snapshot_id);
+    // Set socket non-blocking.
+    socket_set_non_blocking(cli_fd);
 
+    return;
+err:
+    close(cli_fd);
+}
 
-	if (pthread_create(
-		    &tid, NULL, process_local_image_connection, (void *) wt)) {
-		pr_perror("Unable to create worker thread");
-		goto err;
+void finish_proxy_read(struct roperation* rop)
+{
+    // If finished forwarding image
+    if (rop->fd == proxy_to_cache_fd) {
+		// Update fwd buffer and byte count on rimg.
+		rop->rimg->curr_fwd_buf = rop->curr_sent_buf;
+		rop->rimg->curr_fwd_bytes = rop->curr_sent_bytes;
+
+		forwarding = false;
+
+        // If there are images waiting to be forwarded, forward the next.
+        if (!list_empty(&rop_forwarding)) {
+            forward_remote_image(list_entry(rop_forwarding.next, struct roperation, l));
+        }
+    }
+}
+
+void finish_proxy_write(struct roperation* rop)
+{
+    // No more local images are comming. Close local socket.
+    if (!strncmp(rop->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
+        // TODO - couldn't we handle the DUMP_FINISH in inside handle_accept_proxy_write?
+		finish_local();
+    }
+    // Normal image received, forward it.
+    else {
+        struct roperation *rop_to_forward = new_remote_operation(
+        rop->path, rop->snapshot_id, proxy_to_cache_fd, rop->flags, false);
+
+    	// Add image to list of images.
+		list_add_tail(&(rop->rimg->l), &rimg_head);
+
+        rop_set_rimg(rop_to_forward, rop->rimg);
+        if (list_empty(&rop_forwarding)) {
+            forward_remote_image(rop_to_forward);
+        }
+    	list_add_tail(&(rop_to_forward->l), &rop_forwarding);
+    }
+}
+
+void finish_cache_write(struct roperation* rop)
+{
+    struct roperation *prop = get_rop_by_name(
+        &rop_pending, rop->snapshot_id, rop->path);
+
+	forwarding = false;
+   	event_set(epoll_fd, EPOLL_CTL_ADD, proxy_to_cache_fd, EPOLLIN, &proxy_to_cache_fd);
+
+    // Add image to list of images.
+	list_add_tail(&(rop->rimg->l), &rimg_head);
+
+    // TODO - what if we have multiple requests for the same name?
+    if (prop != NULL) {
+		pr_info("\t[fd=%d] Resuming pending %s for %s:%s\n",
+			prop->fd,
+			prop->flags == O_APPEND ?
+				"append" : prop->flags == O_RDONLY ?
+					"read" : "write",
+			prop->snapshot_id, prop->path);
+
+		// Write header for pending image.
+		if (write_reply_header(prop->fd, 0) < 0) {
+			pr_perror("Error writing reply header for %s:%s",
+				prop->path, prop->snapshot_id);
+			close(prop->fd);
+            free(prop);
+			return;
+		}
+
+        rop_set_rimg(prop, rop->rimg);
+        list_del(&(prop->l));
+		list_add_tail(&(prop->l), &rop_inprogress);
+        event_set(epoll_fd, EPOLL_CTL_ADD, prop->fd, EPOLLOUT, prop);
+    }
+}
+
+void handle_roperation(struct epoll_event *event, struct roperation *rop)
+{
+	int64_t ret = (EPOLLOUT & event->events) ?
+		send_image_async(rop) :
+		recv_image_async(rop);
+
+	if (ret > 0 || ret == EAGAIN || ret == EWOULDBLOCK) {
+		event_set(
+			epoll_fd,
+			EPOLL_CTL_ADD,
+			rop->fd,
+			event->events,
+			rop);
+        return;
 	}
-	wt->tid = tid;
-	add_worker(wt);
-	return;
+
+    // Remove rop from list (either in progress or forwarding).
+    list_del(&(rop->l));
+
+    // Operation is finished.
+    if (ret < 0) {
+        pr_perror("Unable to %s %s:%s (returned %ld)",
+				event->events & EPOLLOUT ? "send" : "receive",
+                rop->rimg->path, rop->rimg->snapshot_id, ret);
+        goto err;
+    } else {
+        pr_info("[fd=%d] Finished %s %s:%s to CRIU (size %ld)\n",
+				rop->fd,
+				event->events & EPOLLOUT ? "sending" : "receiving",
+				rop->rimg->path, rop->rimg->snapshot_id, rop->rimg->size);
+    }
+
+    // If receive operation is finished
+    if (event->events & EPOLLIN) {
+
+        // Cached side (finished receiving forwarded image)
+        if (restoring) {
+            finish_cache_write(rop);
+        }
+        // Proxy side (finished receiving local image)
+        else {
+            finish_proxy_write(rop);
+        }
+	}
+    // If send operation if finished
+    else {
+        // Proxy side (Finished forwarding image or reading it locally).
+        if (!restoring)
+            finish_proxy_read(rop);
+        // Nothing to be done when a read is finished on the cache side.
+    }
 err:
-	close(cli_fd);
-	free(wt);
+    free(rop);
 }
 
+void check_pending_forwards()
+{
+    struct roperation *rop = NULL;
+    struct rimage *rimg = NULL;
+
+	list_for_each_entry(rop, &rop_forwarding, l) {
+        rimg = get_rimg_by_name(rop->snapshot_id, rop->path);
+        if (rimg != NULL) {
+            rop_set_rimg(rop, rimg);
+			forward_remote_image(rop);
+			return;
+        }
+	}
+}
 
-void *accept_local_image_connections(void *port)
+void check_pending_reads()
 {
-	int fd = *((int *) port);
-	int epoll_fd;
-	struct epoll_event *events;
+    struct roperation *rop = NULL;
+    struct rimage *rimg = NULL;
+
+	list_for_each_entry(rop, &rop_pending, l) {
+        rimg = get_rimg_by_name(rop->snapshot_id, rop->path);
+        if (rimg != NULL) {
+            rop_set_rimg(rop, rimg);
+            event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
+        }
+	}
+}
+
+void accept_image_connections() {
 	int ret;
 
 	epoll_fd = epoll_create(EPOLL_MAX_EVENTS);
 	if (epoll_fd < 0) {
 		pr_perror("Unable to open epoll");
-		return NULL;
+		return;
 	}
 
 	events = calloc(EPOLL_MAX_EVENTS, sizeof(struct epoll_event));
@@ -705,57 +882,89 @@ void *accept_local_image_connections(void *port)
 		goto end;
 	}
 
-	ret = event_set(epoll_fd, EPOLL_CTL_ADD, fd, EPOLLIN, &fd);
+	ret = event_set(epoll_fd, EPOLL_CTL_ADD, local_req_fd, EPOLLIN, &local_req_fd);
 	if (ret) {
-		pr_perror("Failed to set event for epoll");
+		pr_perror("Failed to add local fd to epoll");
 		goto end;
 	}
 
+    // Only if we are restoring (cache-side) we need to add the remote sock to
+    // the epoll.
+    if (restoring) {
+        ret = event_set(epoll_fd, EPOLL_CTL_ADD, proxy_to_cache_fd,
+                    EPOLLIN, &proxy_to_cache_fd);
+        if (ret) {
+    		pr_perror("Failed to add proxy to cache fd to epoll");
+        	goto end;
+    	}
+    }
+
 	while (1) {
-		int n_events = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, -1);
+		int n_events = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, 250);
 		if (n_events < 0) {
 			pr_perror("Failed to epoll wait");
 			goto end;
 		}
 
 		for (int i = 0; i < n_events; i++) {
-			if (events[i].data.ptr == &fd) {
+            // Accept from local dump/restore?
+			if (events[i].data.ptr == &local_req_fd) {
 				if ( events[i].events & EPOLLHUP ||
 				     events[i].events & EPOLLERR) {
-					if (!finished)
+					if (!finished_local)
 						pr_perror("Unable to accept more local image connections");
 					goto end;
 				}
-				// accept
-				pr_perror("Calling accept %d", i);
-				handle_local_accept(fd);
+				handle_local_accept(local_req_fd);
 			}
+            else if (restoring && !forwarding && events[i].data.ptr == &proxy_to_cache_fd) {
+	            event_set(epoll_fd, EPOLL_CTL_DEL, proxy_to_cache_fd, 0, 0);
+                handle_remote_accept(proxy_to_cache_fd);
+            }
 			else {
-				// TODO - handle write/read
-				pr_perror("Event on unexpected file descripor");
-				goto end;
+				struct roperation *rop =
+					(struct roperation*)events[i].data.ptr;
+	            event_set(epoll_fd, EPOLL_CTL_DEL, rop->fd, 0, 0);
+				handle_roperation(&events[i], rop);
 			}
 		}
-	}
+
+        // Check if there are any pending operations
+        if (restoring)
+            check_pending_reads();
+		else if (!forwarding)
+            check_pending_forwards();
+
+		// Check if we can close the tcp socket (this will unblock the cache
+		// to answer "no image" to restore).
+		if (!restoring &&
+				finished_local &&
+				!finished_remote &&
+				list_empty(&rop_forwarding)) {
+			close(proxy_to_cache_fd);
+			finished_remote = true;
+		}
+
+        // If both local and remote sockets are closed, leave.
+        if (finished_local && finished_remote) {
+			pr_info("\tFinished both local and remote, exiting\n");
+            goto end;
+		}
+    }
 end:
+	// TODO - release pending when no receiving and finished.
 	close(epoll_fd);
-	close(fd);
+	close(local_req_fd);
 	free(events);
-	return NULL;
 }
 
 
 int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
 {
 	int ret;
-	struct roperation *op = malloc(sizeof(struct roperation));
-	bzero(op, sizeof(struct roperation));
-	op->fd = fd;
-	op->rimg = rimg;
-	op->size = size;
-	op->flags = flags;
-	op->close_fd = close_fd;
-	op->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+    struct roperation *op = new_remote_operation(
+        rimg->path, rimg->snapshot_id, fd, flags, close_fd);
+    rop_set_rimg(op, rimg);
 	while ((ret = recv_image_async(op)) < 0)
 		if (ret != EAGAIN && ret != EWOULDBLOCK)
 			return -1;
@@ -771,70 +980,58 @@ int64_t recv_image_async(struct roperation *op)
 	int fd = op->fd;
 	struct rimage *rimg = op->rimg;
 	uint64_t size = op->size;
-	int flags = op->flags;
 	bool close_fd = op->close_fd;
 	struct rbuf *curr_buf = op->curr_recv_buf;
 	int n;
 
-	if (curr_buf == NULL) {
-		if (flags == O_APPEND)
-			curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
-		else
-			curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
-	}
-
-	while (1) {
-		n = read(fd,
+	n = read(fd,
 			 curr_buf->buffer + curr_buf->nbytes,
 			 size ?
 			     min((int) (size - rimg->size), BUF_SIZE - curr_buf->nbytes) :
 			     BUF_SIZE - curr_buf->nbytes);
-		if (n == 0) {
-			if (close_fd)
-				close(fd);
-			return rimg->size;
-		} else if (n > 0) {
-			curr_buf->nbytes += n;
-			rimg->size += n;
-			if (curr_buf->nbytes == BUF_SIZE) {
-			  struct rbuf *buf = malloc(sizeof(struct rbuf));
-				if (buf == NULL) {
-					pr_perror("Unable to allocate remote_buffer structures");
-					if (close_fd)
-						close(fd);
-					return -1;
-				}
-				buf->nbytes = 0;
-				list_add_tail(&(buf->l), &(rimg->buf_head));
-				curr_buf = buf;
-			}
-			if (size && rimg->size == size) {
+	if (n == 0) {
+		if (close_fd)
+			close(fd);
+		return n;
+	} else if (n > 0) {
+		curr_buf->nbytes += n;
+		rimg->size += n;
+		if (curr_buf->nbytes == BUF_SIZE) {
+			struct rbuf *buf = malloc(sizeof(struct rbuf));
+			if (buf == NULL) {
+				pr_perror("Unable to allocate remote_buffer structures");
 				if (close_fd)
 					close(fd);
-				return rimg->size;
+				return -1;
 			}
-		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
-			return errno;
-		} else {
-			pr_perror("Read on %s:%s socket failed",
-				rimg->path, rimg->snapshot_id);
+			buf->nbytes = 0;
+			list_add_tail(&(buf->l), &(rimg->buf_head));
+			op->curr_recv_buf = buf;
+			return n;
+		}
+		if (size && rimg->size == size) {
 			if (close_fd)
 				close(fd);
-			return -1;
+			return 0;
 		}
+	} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+		return errno;
+	} else {
+		pr_perror("Read for %s:%s socket on fd=%d failed",
+			rimg->path, rimg->snapshot_id, fd);
+		if (close_fd)
+			close(fd);
+		return -1;
 	}
+	return n;
 }
 
 int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
 {
 	int ret;
-	struct roperation *op = malloc(sizeof(struct roperation));
-	bzero(op, sizeof(struct roperation));
-	op->fd = fd;
-	op->rimg = rimg;
-	op->flags = flags;
-	op->close_fd = close_fd;
-	op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	struct roperation *op = new_remote_operation(
+        rimg->path, rimg->snapshot_id, fd, flags, close_fd);
+    rop_set_rimg(op, rimg);
 	while ((ret = send_image_async(op)) < 0)
 		if (ret != EAGAIN && ret != EWOULDBLOCK)
 			return -1;
@@ -845,47 +1042,43 @@ int64_t send_image_async(struct roperation *op)
 {
 	int fd = op->fd;
 	struct rimage *rimg = op->rimg;
-	int flags = op->flags;
 	bool close_fd = op->close_fd;
 	int n;
 
-	if (flags != O_APPEND) {
-		op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
-		op->curr_sent_bytes = 0;
-	}
-
-	while (1) {
-		n = send(
-		    fd,
-		    op->curr_sent_buf->buffer + op->curr_sent_bytes,
-		    min(BUF_SIZE, op->curr_sent_buf->nbytes) - op->curr_sent_bytes,
-		    MSG_NOSIGNAL);
-		if (n > -1) {
-			op->curr_sent_bytes += n;
-			if (op->curr_sent_bytes == BUF_SIZE) {
-				op->curr_sent_buf =
-				    list_entry(op->curr_sent_buf->l.next, struct rbuf, l);
-				op->nblocks++;
-				op->curr_sent_bytes = 0;
-			} else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
-				if (close_fd)
-					close(fd);
-				return op->nblocks*BUF_SIZE + op->curr_sent_buf->nbytes;
-			}
-		} else if (errno == EPIPE || errno == ECONNRESET) {
-			pr_warn("Connection for %s:%s was closed early than expected\n",
-				rimg->path, rimg->snapshot_id);
-			return 0;
-		} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
-			return errno;
+	n = write(
+		fd,
+		op->curr_sent_buf->buffer + op->curr_sent_bytes,
+		min(BUF_SIZE, op->curr_sent_buf->nbytes) - op->curr_sent_bytes);
+
+	if (n > -1) {
+		op->curr_sent_bytes += n;
+		if (op->curr_sent_bytes == BUF_SIZE) {
+			op->curr_sent_buf =
+			    list_entry(op->curr_sent_buf->l.next, struct rbuf, l);
+			op->curr_sent_bytes = 0;
+			return n;
 		}
-		else {
-			pr_perror("Write on %s:%s socket failed",
-				rimg->path, rimg->snapshot_id);
-			return -1;
+		// TODO - cloudn't we just compare to the img size?
+		else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
+			if (close_fd)
+				close(fd);
+			return 0;
 		}
+		return n;
+	}
+	// TODO - clouldn't these checks be made upstream?
+	else if (errno == EPIPE || errno == ECONNRESET) {
+		pr_warn("Connection for %s:%s was closed early than expected\n",
+			rimg->path, rimg->snapshot_id);
+		return 0;
+	} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+		return errno;
+	}
+	else {
+		pr_perror("Write on %s:%s socket failed",
+			rimg->path, rimg->snapshot_id);
+		return -1;
 	}
-
 }
 
 int read_remote_image_connection(char *snapshot_id, char *path)
diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
index 779a137f..029857f7 100644
--- a/criu/include/img-remote.h
+++ b/criu/include/img-remote.h
@@ -32,32 +32,31 @@ struct rbuf {
 };
 
 struct rimage {
+  /* Path and snapshot id identify the image. */
 	char path[PATHLEN];
 	char snapshot_id[PATHLEN];
+	/* List anchor. */
 	struct list_head l;
+	/* List of buffers that compose the image. */
 	struct list_head buf_head;
-	uint64_t size; /* number of bytes */
-	pthread_mutex_t in_use; /* Only one operation at a time, per image. */
-};
-
-struct wthread {
-	pthread_t tid;
-	struct list_head l;
-	/* Client fd. */
-	int fd;
-	/* The path and snapshot_id identify the request handled by this thread. */
-	char path[PATHLEN];
-	char snapshot_id[PATHLEN];
-	int flags;
-	/* This semph is used to wake this thread if the image is in memory.*/
-	sem_t wakeup_sem;
+	/* Number of bytes. */
+	uint64_t size;
+	/* Note: forward (send) operation only. Buffer to start forwarding. */
+	struct rbuf *curr_fwd_buf;
+	/* Note: forward (send) operation only. Number of fwd bytes in 'curr_fw_buf'. */
+	uint64_t curr_fwd_bytes;
 };
 
 /* Structure that describes the state of a remote operation on remote images. */
 struct roperation {
+	/* List anchor. */
+	struct list_head l;
 	/* File descriptor being used. */
 	int fd;
-	/* Remote image being used. */
+  /* Path and snapshot id identify the required image. */
+  char path[PATHLEN];
+	char snapshot_id[PATHLEN];
+	/* Remote image being used (may be null if the operation is pending). */
 	struct rimage *rimg;
 	/* Flags for the operation. */
 	int flags;
@@ -66,37 +65,23 @@ struct roperation {
 	/* Note: recv operation only. How much bytes should be received. */
 	uint64_t size;
 	/* Note: recv operation only. Buffer being writen. */
-	struct rbuf *curr_recv_buf;
-	/* Note: send operation only. Number of blocks already sent. */
-	int nblocks;
+	struct rbuf *curr_recv_buf; // TODO - needed? Could be replaced by list.last!
 	/* Note: send operation only. Pointer to buffer being sent. */
 	struct rbuf *curr_sent_buf;
 	/* Note: send operation only. Number of bytes sent in 'curr_send_buf. */
 	uint64_t curr_sent_bytes;
 };
 
-/* This variable is used to indicate when the dump is finished. */
-extern bool finished;
 /* This is the proxy to cache TCP socket FD. */
 extern int proxy_to_cache_fd;
 /* This the unix socket used to fulfill local requests. */
 extern int local_req_fd;
+/* True if we are running the cache/restore, false if proxy/dump. */
+extern bool restoring;
 
-int init_daemon(bool background, struct rimage *(*wfi)(struct wthread*));
-
-void join_workers(void);
-void unlock_workers(void);
-
-void prepare_recv_rimg(void);
-void finalize_recv_rimg(struct rimage *rimg);
-struct rimage *prepare_remote_image(char *path, char *namesapce, int flags);
+void accept_image_connections();
 struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path);
-bool is_receiving(void);
-
-void *accept_local_image_connections(void *ptr);
-void *accept_remote_image_connections(void *ptr);
 
-int64_t forward_image(struct rimage *rimg);
 int64_t send_image(int fd, struct rimage *rimg, int flags, bool image_check);
 int64_t send_image_async(struct roperation *op);
 int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool image_check);
-- 
2.17.0



More information about the CRIU mailing list