[CRIU] [Process Migration using Sockets v4 - Patch 2/3]

Rodrigo Bruno rbruno at gsd.inesc-id.pt
Fri Dec 11 14:23:37 PST 2015


Here goes the second part of the patch.

I am sending this again because I fixed the code (checkpatch.pl warnings and erros).

I also had to split the way we wait for images. Now we have a separate way of 
implementing how to wait for images, one for proxy and one for cache. The reason
for this is that the proxy can have unexisting files (previously existing 
subprocesses stuff that no longer exist for example) and we can not wait for 
these files. On the other hand, we want to wait for everything in the cache side.

Signed-off-by: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
>From 498c3d9686590bfb6648c6ecc3ddad662cf6e635 Mon Sep 17 00:00:00 2001
From: rodrigo-bruno <rbruno at gsd.inesc-id.pt>
Date: Fri, 11 Dec 2015 22:07:37 +0000
Subject: [PATCH 2/3] Process Migration using Sockets v4 (2/3)

---
 img-cache.c                | 169 +++++++++++
 img-proxy.c                | 119 ++++++++
 img-remote-proto.c         | 693 +++++++++++++++++++++++++++++++++++++++++++++
 include/img-remote-proto.h |  85 ++++++
 4 files changed, 1066 insertions(+)
 create mode 100644 img-cache.c
 create mode 100644 img-proxy.c
 create mode 100644 img-remote-proto.c
 create mode 100644 include/img-remote-proto.h

diff --git a/img-cache.c b/img-cache.c
new file mode 100644
index 0000000..e2ba7b0
--- /dev/null
+++ b/img-cache.c
@@ -0,0 +1,169 @@
+#include <unistd.h>
+
+#include "img-remote.h"
+#include "img-remote-proto.h"
+#include "criu-log.h"
+#include <pthread.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <fcntl.h>
+#include "cr_options.h"
+
+static struct rimage *wait_for_image(struct wthread *wt)
+{
+	struct rimage *result;
+
+	result = get_rimg_by_name(wt->snapshot_id, wt->path);
+	if (result != NULL) {
+		if (write_reply_header(wt->fd, 0) < 0) {
+			pr_perror("Error writing reply header for %s:%s",
+			    wt->path, wt->snapshot_id);
+			close(wt->fd);
+			return NULL;
+		}
+		return result;
+	}
+
+	/* The file does not exist and we do not expect new files */
+	if (finished && !is_receiving()) {
+	    pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
+		if (write_reply_header(wt->fd, ENOENT) < 0) {
+			pr_perror("Error writing reply header for unexisting image");
+			return NULL;
+		}
+		close(wt->fd);
+		return NULL;
+	}
+	/* NOTE: at this point, when the thread wakes up, either the image is
+	 * already in memory or it will never come (the dump is finished).
+	 */
+	sem_wait(&(wt->wakeup_sem));
+	result = get_rimg_by_name(wt->snapshot_id, wt->path);
+	if (result != NULL) {
+		if (write_reply_header(wt->fd, 0) < 0) {
+			pr_perror("Error writing reply header for %s:%s",
+			    wt->path, wt->snapshot_id);
+			close(wt->fd);
+			return NULL;
+		}
+		return result;
+	} else {
+		pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
+		if (write_reply_header(wt->fd, ENOENT) < 0)
+			pr_perror("Error writing reply header for unexisting image");
+		close(wt->fd);
+		return NULL;
+	}
+}
+
+/* The image cache creates a thread that calls this function. It waits for remote
+ * images from the image-cache.
+ */
+void *accept_remote_image_connections(void *port)
+{
+	int fd = *((int *) port);
+	struct sockaddr_in cli_addr;
+	socklen_t clilen = sizeof(cli_addr);
+	char snapshot_id_buf[PATHLEN], path_buf[PATHLEN];
+	uint64_t size, ret;
+	int flags, proxy_fd;
+	struct rimage *rimg;
+
+	proxy_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
+	if (proxy_fd < 0) {
+		pr_perror("Unable to accept remote image connection from image proxy");
+		return NULL;
+	}
+	while (1) {
+		ret = read_remote_header(proxy_fd, snapshot_id_buf, path_buf, &flags, &size);
+		if (ret < 0) {
+			pr_perror("Unable to receive remote header from image proxy");
+			return NULL;
+		}
+		/* This means that the no more images are coming. */
+		else if (!ret) {
+			pr_info("Image Proxy connection closed.\n");
+			finished = true;
+			unlock_workers();
+			return NULL;
+		}
+
+		pr_info("Received %s request for %s:%s\n",
+			flags == O_RDONLY ? "read" :
+				flags == O_APPEND ? "append" : "write",
+			path_buf, snapshot_id_buf);
+
+		rimg = prepare_remote_image(path_buf, snapshot_id_buf, flags);
+
+		prepare_recv_rimg();
+		ret = recv_image(proxy_fd, rimg, size, flags, false);
+		if (ret < 0) {
+			pr_perror("Unable to receive %s:%s from image proxy",
+				rimg->path, rimg->snapshot_id);
+			finalize_recv_rimg(NULL);
+			return NULL;
+		} else if (ret != size) {
+			pr_perror("Unable to receive %s:%s from image proxy (received %lu bytes, expected %lu bytes)",
+			    rimg->path, rimg->snapshot_id, ret, size);
+			finalize_recv_rimg(NULL);
+			return NULL;
+		}
+		finalize_recv_rimg(rimg);
+
+		pr_info("Finished receiving %s:%s (received %lu bytes)\n",
+			rimg->path, rimg->snapshot_id, ret);
+	}
+}
+
+int image_cache(char *local_cache_path, unsigned short cache_write_port)
+{
+	pthread_t local_req_thr, remote_req_thr;
+	int local_req_fd, remote_req_fd;
+
+	pr_info("Proxy to Cache Port %d, CRIU to Cache Path %s\n", cache_write_port, local_cache_path);
+
+
+	if (opts.ps_socket != -1) {
+		remote_req_fd = opts.ps_socket;
+		pr_info("Re-using ps socket %d\n", remote_req_fd);
+	} else {
+		remote_req_fd = setup_TCP_server_socket(cache_write_port);
+		if (remote_req_fd < 0) {
+			pr_perror("Unable to open proxy to cache TCP socket");
+			return -1;
+		}
+	}
+
+	local_req_fd = setup_UNIX_server_socket(local_cache_path);
+	if (local_req_fd < 0) {
+		pr_perror("Unable to open cache to proxy UNIX socket");
+		return -1;
+	}
+
+	if (init_daemon(wait_for_image)) {
+		pr_perror("Unable to initialize daemon");
+		return -1;
+	}
+
+	if (pthread_create(
+	    &remote_req_thr,
+	    NULL, accept_remote_image_connections,
+	    (void *) &remote_req_fd)) {
+		pr_perror("Unable to create remote requests thread");
+		return -1;
+	}
+	if (pthread_create(
+	    &local_req_thr,
+	    NULL,
+	    accept_local_image_connections,
+	    (void *) &local_req_fd)) {
+		pr_perror("Unable to create local requests thread");
+		return -1;
+	}
+
+	join_workers();
+
+	pthread_join(remote_req_thr, NULL);
+	pthread_join(local_req_thr, NULL);
+	return 0;
+}
diff --git a/img-proxy.c b/img-proxy.c
new file mode 100644
index 0000000..6d17499
--- /dev/null
+++ b/img-proxy.c
@@ -0,0 +1,119 @@
+#include <unistd.h>
+
+#include "img-remote.h"
+#include "img-remote-proto.h"
+#include "criu-log.h"
+#include <pthread.h>
+#include <fcntl.h>
+#include "cr_options.h"
+
+int proxy_to_cache_fd;
+
+static struct rimage *wait_for_image(struct wthread *wt)
+{
+	struct rimage *result;
+
+	result = get_rimg_by_name(wt->snapshot_id, wt->path);
+	if (result != NULL) {
+		if (write_reply_header(wt->fd, 0) < 0) {
+			pr_perror("Error writing reply header for %s:%s",
+			    wt->path, wt->snapshot_id);
+			close(wt->fd);
+			return NULL;
+		}
+		return result;
+	}
+
+	/* The file does not exist. */
+	else {
+	    pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
+		if (write_reply_header(wt->fd, ENOENT) < 0) {
+			pr_perror("Error writing reply header for unexisting image");
+			return NULL;
+		}
+		close(wt->fd);
+		return NULL;
+	}
+}
+
+uint64_t forward_image(struct rimage *rimg)
+{
+	uint64_t ret;
+	int fd = proxy_to_cache_fd;
+
+	if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
+		finished = true;
+		return 0;
+		/* TODO - how to kill the accept thread? Close the accept fd? */
+	}
+
+	pthread_mutex_lock(&(rimg->in_use));
+	if (write_remote_header(
+		fd, rimg->snapshot_id, rimg->path, O_APPEND, rimg->size) < 0) {
+		pr_perror("Error writing header for %s:%s",
+			rimg->path, rimg->snapshot_id);
+		pthread_mutex_unlock(&(rimg->in_use));
+		return -1;
+	}
+
+	ret = send_image(fd, rimg, O_APPEND, false);
+	if (ret < 0) {
+		pr_perror("Unable to send %s:%s to image cache",
+			rimg->path, rimg->snapshot_id);
+		pthread_mutex_unlock(&(rimg->in_use));
+		return -1;
+	} else if (ret != rimg->size) {
+		pr_perror("Unable to send %s:%s to image proxy (sent %lu bytes, expected %lu bytes",
+		    rimg->path, rimg->snapshot_id, ret, rimg->size);
+		pthread_mutex_unlock(&(rimg->in_use));
+		return -1;
+	}
+	pthread_mutex_unlock(&(rimg->in_use));
+
+	pr_info("Finished forwarding %s:%s (sent %lu bytes)\n",
+	    rimg->path, rimg->snapshot_id, rimg->size);
+	return ret;
+}
+
+int image_proxy(char *local_proxy_path, char *fwd_host, unsigned short fwd_port)
+{
+	pthread_t local_req_thr;
+	int local_req_fd;
+
+	pr_info("CRIU to Proxy Path: %s, Cache Address %s:%hu\n",
+		local_proxy_path, fwd_host, fwd_port);
+
+	local_req_fd = setup_UNIX_server_socket(local_proxy_path);
+	if (local_req_fd < 0) {
+		pr_perror("Unable to open CRIU to proxy UNIX socket");
+		return -1;
+	}
+
+	if (opts.ps_socket != -1) {
+		proxy_to_cache_fd = opts.ps_socket;
+		pr_info("Re-using ps socket %d\n", proxy_to_cache_fd);
+	} else {
+		proxy_to_cache_fd = setup_TCP_client_socket(fwd_host, fwd_port);
+		if (proxy_to_cache_fd < 0) {
+			pr_perror("Unable to open proxy to cache TCP socket");
+			return -1;
+		}
+	}
+
+	if (init_daemon(wait_for_image))
+		return -1;
+
+	if (pthread_create(
+	    &local_req_thr,
+	    NULL,
+	    accept_local_image_connections,
+	    (void *) &local_req_fd)) {
+		pr_perror("Unable to create local requests thread");
+		return -1;
+	}
+
+	join_workers();
+
+	pthread_join(local_req_thr, NULL);
+	return 0;
+}
diff --git a/img-remote-proto.c b/img-remote-proto.c
new file mode 100644
index 0000000..a0a68d7
--- /dev/null
+++ b/img-remote-proto.c
@@ -0,0 +1,693 @@
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <semaphore.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include "sys/un.h"
+#include <pthread.h>
+#include <fcntl.h>
+
+#include "img-remote-proto.h"
+#include "criu-log.h"
+#include "compiler.h"
+
+#include "protobuf.h"
+#include "protobuf/remote-image.pb-c.h"
+#include "image.h"
+
+LIST_HEAD(rimg_head);
+pthread_mutex_t rimg_lock;
+
+pthread_mutex_t proxy_to_cache_lock;
+
+LIST_HEAD(workers_head);
+pthread_mutex_t workers_lock;
+sem_t workers_semph;
+
+struct rimage * (*wait_for_image) (struct wthread *wt);
+
+bool finished = false;
+int writing = 0;
+int forwarding = 0;
+
+struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path)
+{
+	struct rimage *rimg = NULL;
+
+	pthread_mutex_lock(&rimg_lock);
+	list_for_each_entry(rimg, &rimg_head, l) {
+		if (!strncmp(rimg->path, path, PATHLEN) &&
+		    !strncmp(rimg->snapshot_id, snapshot_id, PATHLEN)) {
+			pthread_mutex_unlock(&rimg_lock);
+			return rimg;
+		}
+	}
+	pthread_mutex_unlock(&rimg_lock);
+	return NULL;
+}
+
+static struct wthread *get_wt_by_name(const char *snapshot_id, const char *path)
+{
+	struct wthread *wt = NULL;
+
+	pthread_mutex_lock(&workers_lock);
+	list_for_each_entry(wt, &workers_head, l) {
+		if (!strncmp(wt->path, path, PATHLEN) &&
+		   !strncmp(wt->snapshot_id, snapshot_id, PATHLEN)) {
+			pthread_mutex_unlock(&workers_lock);
+			return wt;
+		}
+	}
+	pthread_mutex_unlock(&workers_lock);
+	return NULL;
+}
+
+static int init_sync_structures()
+{
+	if (pthread_mutex_init(&rimg_lock, NULL) != 0) {
+		pr_perror("Remote image list mutex init failed");
+		return -1;
+	}
+
+	if (pthread_mutex_init(&proxy_to_cache_lock, NULL) != 0) {
+		pr_perror("Remote image connection mutex init failed");
+		return -1;
+	}
+
+	if (pthread_mutex_init(&workers_lock, NULL) != 0) {
+		pr_perror("Workers mutex init failed");
+		return -1;
+	}
+
+	if (sem_init(&workers_semph, 0, 0) != 0) {
+		pr_perror("Workers semaphore init failed");
+		return -1;
+	}
+
+	return 0;
+}
+
+void prepare_recv_rimg()
+{
+	pthread_mutex_lock(&rimg_lock);
+	writing++;
+	pthread_mutex_unlock(&rimg_lock);
+}
+
+void finalize_recv_rimg(struct rimage *rimg)
+{
+
+	pthread_mutex_lock(&rimg_lock);
+
+	if (rimg)
+		list_add_tail(&(rimg->l), &rimg_head);
+	writing--;
+	pthread_mutex_unlock(&rimg_lock);
+	/* Wake thread waiting for this image. */
+	if (rimg) {
+		struct wthread *wt = get_wt_by_name(rimg->snapshot_id, rimg->path);
+		if (wt)
+			sem_post(&(wt->wakeup_sem));
+	}
+}
+
+bool is_receiving()
+{
+	int ret;
+
+	pthread_mutex_lock(&rimg_lock);
+	ret = writing;
+	pthread_mutex_unlock(&rimg_lock);
+	return ret > 0;
+}
+
+static void prepare_fwd_rimg()
+{
+	pthread_mutex_lock(&rimg_lock);
+	forwarding++;
+	pthread_mutex_unlock(&rimg_lock);
+}
+
+static void finalize_fwd_rimg()
+{
+	pthread_mutex_lock(&rimg_lock);
+	forwarding--;
+	pthread_mutex_unlock(&rimg_lock);
+}
+
+static bool is_forwarding()
+{
+	int ret;
+
+	pthread_mutex_lock(&rimg_lock);
+	ret = forwarding;
+	pthread_mutex_unlock(&rimg_lock);
+	return ret > 0;
+}
+
+/* This function is called when no more images are coming. Threads still waiting
+ * for images will be awaken to send a ENOENT (no such file) to the requester.
+ */
+void unlock_workers()
+{
+	struct wthread *wt = NULL;
+
+	pthread_mutex_lock(&workers_lock);
+	list_for_each_entry(wt, &workers_head, l)
+		sem_post(&(wt->wakeup_sem));
+	pthread_mutex_unlock(&workers_lock);
+}
+
+int init_daemon(struct rimage *(*wfi)(struct wthread*))
+{
+	wait_for_image = wfi;
+	return init_sync_structures();
+}
+
+int setup_TCP_server_socket(int port)
+{
+	struct sockaddr_in serv_addr;
+	int sockopt = 1;
+	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+
+	if (sockfd < 0) {
+		pr_perror("Unable to open image socket");
+		return -1;
+	}
+
+	bzero((char *) &serv_addr, sizeof(serv_addr));
+	serv_addr.sin_family = AF_INET;
+	serv_addr.sin_addr.s_addr = INADDR_ANY;
+	serv_addr.sin_port = htons(port);
+
+	if (setsockopt(
+	    sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof(sockopt)) == -1) {
+		pr_perror("Unable to set SO_REUSEADDR");
+		return -1;
+	}
+
+	if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+		pr_perror("Unable to bind image socket");
+		return -1;
+	}
+
+	if (listen(sockfd, DEFAULT_LISTEN)) {
+		pr_perror("Unable to listen image socket");
+		return -1;
+	}
+
+	return sockfd;
+}
+
+int setup_TCP_client_socket(char *hostname, int port)
+{
+	int sockfd;
+	struct sockaddr_in serv_addr;
+	struct hostent *server;
+
+	sockfd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sockfd < 0) {
+		pr_perror("Unable to open remote image socket");
+		return -1;
+	}
+
+	server = gethostbyname(hostname);
+	if (server == NULL) {
+		pr_perror("Unable to get host by name (%s)", hostname);
+		return -1;
+	}
+
+	bzero((char *) &serv_addr, sizeof(serv_addr));
+	serv_addr.sin_family = AF_INET;
+	bcopy((char *) server->h_addr,
+	      (char *) &serv_addr.sin_addr.s_addr,
+	      server->h_length);
+	serv_addr.sin_port = htons(port);
+
+	if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+		pr_perror("Unable to connect to remote %s", hostname);
+		return -1;
+	}
+
+	return sockfd;
+}
+
+int setup_UNIX_server_socket(char *path)
+{
+	struct sockaddr_un addr;
+	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+
+	if (sockfd < 0) {
+		pr_perror("Unable to open image socket");
+		return -1;
+	}
+
+	memset(&addr, 0, sizeof(addr));
+	addr.sun_family = AF_UNIX;
+	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
+
+	unlink(path);
+
+	if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+		pr_perror("Unable to bind image socket");
+		return -1;
+	}
+
+	if (listen(sockfd, 50) == -1) {
+		pr_perror("Unable to listen image socket");
+		return -1;
+	}
+
+	return sockfd;
+}
+
+int setup_UNIX_client_socket(char *path)
+{
+	struct sockaddr_un addr;
+	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+
+	if (sockfd < 0) {
+		pr_perror("Unable to open local image socket");
+		return -1;
+	}
+
+	memset(&addr, 0, sizeof(addr));
+	addr.sun_family = AF_UNIX;
+	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
+
+	if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+		pr_perror("Unable to connect to local socket: %s", path);
+		return -1;
+	}
+
+	return sockfd;
+}
+
+uint64_t pb_write_obj(int fd, void *obj, int type)
+{
+	struct cr_img img;
+
+	img._x.fd = fd;
+	bfd_setraw(&img._x);
+	return pb_write_one(&img, obj, type);
+}
+
+uint64_t pb_read_obj(int fd, void **pobj, int type)
+{
+	struct cr_img img;
+
+	img._x.fd = fd;
+	bfd_setraw(&img._x);
+	return do_pb_read_one(&img, pobj, type, true);
+}
+
+uint64_t write_header(int fd, char *snapshot_id, char *path, int flags)
+{
+	LocalImageEntry li = LOCAL_IMAGE_ENTRY__INIT;
+
+	li.name = path;
+	li.snapshot_id = snapshot_id;
+	li.open_mode = flags;
+	return pb_write_obj(fd, &li, PB_LOCAL_IMAGE);
+}
+
+uint64_t write_reply_header(int fd, int error)
+{
+	LocalImageReplyEntry lir = LOCAL_IMAGE_REPLY_ENTRY__INIT;
+
+	lir.error = error;
+	return pb_write_obj(fd, &lir, PB_LOCAL_IMAGE_REPLY);
+}
+
+uint64_t write_remote_header(int fd, char *snapshot_id, char *path, int flags, uint64_t size)
+{
+	RemoteImageEntry ri = REMOTE_IMAGE_ENTRY__INIT;
+
+	ri.name = path;
+	ri.snapshot_id = snapshot_id;
+	ri.open_mode = flags;
+	ri.size = size;
+	return pb_write_obj(fd, &ri, PB_REMOTE_IMAGE);
+}
+
+uint64_t read_header(int fd, char *snapshot_id, char *path, int *flags)
+{
+	LocalImageEntry *li;
+	int ret = pb_read_obj(fd, (void **)&li, PB_LOCAL_IMAGE);
+
+	if (ret > 0) {
+		strncpy(snapshot_id, li->snapshot_id, PATHLEN);
+		strncpy(path, li->name, PATHLEN);
+		*flags = li->open_mode;
+	}
+	free(li);
+	return ret;
+}
+
+uint64_t read_reply_header(int fd, int *error)
+{
+	LocalImageReplyEntry *lir;
+	int ret = pb_read_obj(fd, (void **)&lir, PB_LOCAL_IMAGE_REPLY);
+
+	if (ret > 0)
+		*error = lir->error;
+	free(lir);
+	return ret;
+}
+
+uint64_t read_remote_header(int fd, char *snapshot_id, char *path, int *flags, uint64_t *size)
+{
+	RemoteImageEntry *ri;
+	int ret = pb_read_obj(fd, (void **)&ri, PB_REMOTE_IMAGE);
+
+	if (ret > 0) {
+		strncpy(snapshot_id, ri->snapshot_id, PATHLEN);
+		strncpy(path, ri->name, PATHLEN);
+		*flags = ri->open_mode;
+		*size = ri->size;
+	}
+	free(ri);
+	return ret;
+}
+
+static struct wthread *new_worker()
+{
+	struct wthread *wt = malloc(sizeof(struct wthread));
+
+	if (!wt) {
+		pr_perror("Unable to allocate worker thread structure");
+		return NULL;
+	}
+	if (sem_init(&(wt->wakeup_sem), 0, 0) != 0) {
+		pr_perror("Workers semaphore init failed");
+		return NULL;
+	}
+	return wt;
+}
+
+static void add_worker(struct wthread *wt)
+{
+	pthread_mutex_lock(&workers_lock);
+	list_add_tail(&(wt->l), &workers_head);
+	pthread_mutex_unlock(&workers_lock);
+	sem_post(&workers_semph);
+}
+
+void join_workers()
+{
+	struct wthread *wthread = NULL;
+
+	while (1) {
+	    if (list_empty(&workers_head)) {
+		    sem_wait(&workers_semph);
+		    continue;
+	    }
+	    wthread = list_entry(workers_head.next, struct wthread, l);
+	    if (pthread_join(wthread->tid, NULL))
+		    pr_perror("Could not join thread %lu", (unsigned long) wthread->tid);
+	    else {
+		    list_del(&(wthread->l));
+		    free(wthread);
+	    }
+	}
+}
+
+static struct rimage *new_remote_image(char *path, char *snapshot_id)
+{
+	struct rimage *rimg = malloc(sizeof(struct rimage));
+	struct rbuf *buf = malloc(sizeof(struct rbuf));
+
+	if (rimg == NULL) {
+		pr_perror("Unable to allocate remote_image structures");
+		return NULL;
+	}
+
+	if (buf == NULL) {
+		pr_perror("Unable to allocate remote_buffer structures");
+		return NULL;
+	}
+
+	strncpy(rimg->path, path, PATHLEN);
+	strncpy(rimg->snapshot_id, snapshot_id, PATHLEN);
+	rimg->size = 0;
+	buf->nbytes = 0;
+	INIT_LIST_HEAD(&(rimg->buf_head));
+	list_add_tail(&(buf->l), &(rimg->buf_head));
+	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	rimg->curr_sent_bytes = 0;
+
+	if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
+		pr_perror("Remote image in_use mutex init failed");
+		return NULL;
+	}
+	return rimg;
+}
+
+/* Clears a remote image struct for reusing it. */
+static struct rimage *clear_remote_image(struct rimage *rimg)
+{
+	pthread_mutex_lock(&(rimg->in_use));
+
+	while (!list_is_singular(&(rimg->buf_head))) {
+		struct rbuf *buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+
+		list_del(rimg->buf_head.prev);
+		free(buf);
+	}
+
+	list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
+	rimg->size = 0;
+	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	rimg->curr_sent_bytes = 0;
+
+	pthread_mutex_unlock(&(rimg->in_use));
+
+	return rimg;
+}
+
+struct rimage *prepare_remote_image(char *path, char *snapshot_id, int open_mode)
+{
+	struct rimage *rimg = get_rimg_by_name(snapshot_id, path);
+	/* There is no record of such image, create a new one. */
+
+	if (rimg == NULL)
+		return new_remote_image(path, snapshot_id);
+
+	pthread_mutex_lock(&rimg_lock);
+	list_del(&(rimg->l));
+	pthread_mutex_unlock(&rimg_lock);
+
+	/* There is already an image record. Simply return it for appending. */
+	if (open_mode == O_APPEND)
+		return rimg;
+	/* There is already an image record. Clear it for writing. */
+	else
+		return clear_remote_image(rimg);
+}
+
+static void *process_local_image_connection(void *ptr)
+{
+	struct wthread *wt = (struct wthread *) ptr;
+	struct rimage *rimg = NULL;
+	uint64_t ret;
+
+	/* NOTE: the code inside this if is shared for both cache and proxy. */
+	if (wt->flags == O_RDONLY) {
+		/* TODO - split wait_for_image
+		 * in cache - improve the parent stuf
+		 * in proxy - do not wait for anything, return no file
+		 */
+		rimg = wait_for_image(wt);
+		if (!rimg)
+			return NULL;
+
+		pthread_mutex_lock(&(rimg->in_use));
+		ret = send_image(wt->fd, rimg, wt->flags, true);
+		if (ret < 0)
+			pr_perror("Unable to send %s:%s to CRIU (sent %lu bytes)",
+			    rimg->path, rimg->snapshot_id, ret);
+		else
+			pr_info("Finished sending %s:%s to CRIU (sent %lu bytes)\n",
+			    rimg->path, rimg->snapshot_id, ret);
+		pthread_mutex_unlock(&(rimg->in_use));
+	}
+	/* NOTE: IMAGE PROXY ONLY. The image cache receives write connections
+	 * through TCP (see accept_remote_image_connections).
+	 */
+	else {
+		rimg = prepare_remote_image(wt->path, wt->snapshot_id, wt->flags);
+		ret = recv_image(wt->fd, rimg, 0, wt->flags, true);
+		if (ret < 0) {
+			pr_perror("Unable to receive %s:%s to CRIU (received %lu bytes)",
+			    rimg->path, rimg->snapshot_id, ret);
+			finalize_recv_rimg(NULL);
+			return NULL;
+		}
+		finalize_recv_rimg(rimg);
+		pr_info("Finished receiving %s:%s (received %lu bytes)\n",
+			rimg->path, rimg->snapshot_id, ret);
+
+		pthread_mutex_lock(&proxy_to_cache_lock);
+		ret = forward_image(rimg);
+		pthread_mutex_unlock(&proxy_to_cache_lock);
+		finalize_fwd_rimg();
+		if (ret < 0) {
+		    pr_perror("Unable to forward %s:%s to Image Cache",
+			    rimg->path, rimg->snapshot_id);
+
+		    return NULL;
+		}
+
+		if (finished && !is_forwarding() && !is_receiving()) {
+		    pr_info("Closing connection to Image Cache.\n");
+		    close(proxy_to_cache_fd);
+		    unlock_workers();
+		}
+	}
+	return NULL;
+}
+
+
+void *accept_local_image_connections(void *port)
+{
+	int fd = *((int *) port);
+	int cli_fd;
+	struct sockaddr_in cli_addr;
+
+	socklen_t clilen = sizeof(cli_addr);
+	pthread_t tid;
+	struct wthread *wt;
+
+	while (1) {
+		cli_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
+		if (cli_fd < 0) {
+			pr_perror("Unable to accept local image connection");
+			return NULL;
+		}
+
+		wt = new_worker();
+		wt->fd = cli_fd;
+
+		if (read_header(wt->fd, wt->snapshot_id, wt->path, &(wt->flags)) < 0) {
+			pr_perror("Error reading local image header");
+			return NULL;
+		}
+
+		pr_info("Received %s request for %s:%s\n",
+		    wt->flags == O_RDONLY ? "read" :
+			wt->flags == O_APPEND ? "append" : "write",
+		    wt->path, wt->snapshot_id);
+
+		/* These function calls are used to avoid other threads from
+		 * thinking that there are no more images are coming.
+		 */
+		if (wt->flags != O_RDONLY) {
+			prepare_recv_rimg();
+			prepare_fwd_rimg();
+		}
+
+		if (pthread_create(
+		    &tid, NULL, process_local_image_connection, (void *) wt)) {
+			pr_perror("Unable to create worker thread");
+			return NULL;
+		}
+		wt->tid = tid;
+		add_worker(wt);
+	}
+}
+
+/* Note: size is a limit on how much we want to read from the socket. Zero means
+ * read until the socket is closed.
+ */
+uint64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
+{
+	struct rbuf *curr_buf = NULL;
+	int n;
+
+	if (flags == O_APPEND)
+		curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+	else
+		curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+
+	while (1) {
+		n = read(fd,
+			 curr_buf->buffer + curr_buf->nbytes,
+			 size ?
+			     min((int) (size - rimg->size), BUF_SIZE - curr_buf->nbytes) :
+			     BUF_SIZE - curr_buf->nbytes);
+		if (n == 0) {
+			if (close_fd)
+				close(fd);
+			return rimg->size;
+		} else if (n > 0) {
+			curr_buf->nbytes += n;
+			rimg->size += n;
+			if (curr_buf->nbytes == BUF_SIZE) {
+			  struct rbuf *buf = malloc(sizeof(struct rbuf));
+				if (buf == NULL) {
+					pr_perror("Unable to allocate remote_buffer structures");
+					if (close_fd)
+						close(fd);
+					return -1;
+				}
+				buf->nbytes = 0;
+				list_add_tail(&(buf->l), &(rimg->buf_head));
+				curr_buf = buf;
+			}
+			if (size && rimg->size == size) {
+				if (close_fd)
+					close(fd);
+				return rimg->size;
+			}
+		} else {
+			pr_perror("Read on %s:%s socket failed",
+				rimg->path, rimg->snapshot_id);
+			if (close_fd)
+				close(fd);
+			return -1;
+		}
+	}
+}
+
+uint64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
+{
+
+	int n, nblocks = 0;
+
+	if (flags != O_APPEND) {
+		rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+		rimg->curr_sent_bytes = 0;
+	}
+
+	while (1) {
+		n = send(
+		    fd,
+		    rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
+		    min(BUF_SIZE, rimg->curr_sent_buf->nbytes) - rimg->curr_sent_bytes,
+		    MSG_NOSIGNAL);
+		if (n > -1) {
+			rimg->curr_sent_bytes += n;
+			if (rimg->curr_sent_bytes == BUF_SIZE) {
+				rimg->curr_sent_buf =
+				    list_entry(rimg->curr_sent_buf->l.next, struct rbuf, l);
+				nblocks++;
+				rimg->curr_sent_bytes = 0;
+			} else if (rimg->curr_sent_bytes == rimg->curr_sent_buf->nbytes) {
+				if (close_fd)
+					close(fd);
+			       return nblocks*BUF_SIZE + rimg->curr_sent_buf->nbytes;
+			}
+		} else if (errno == EPIPE || errno == ECONNRESET) {
+			pr_warn("Connection for %s:%s was closed early than expected\n",
+				rimg->path, rimg->snapshot_id);
+			return 0;
+		} else {
+			pr_perror("Write on %s:%s socket failed",
+				rimg->path, rimg->snapshot_id);
+			return -1;
+		}
+	}
+
+}
diff --git a/include/img-remote-proto.h b/include/img-remote-proto.h
new file mode 100644
index 0000000..93b1568
--- /dev/null
+++ b/include/img-remote-proto.h
@@ -0,0 +1,85 @@
+#ifndef IMAGE_REMOTE_PVT_H
+#define	IMAGE_REMOTE_PVT_H
+
+#include <stdbool.h>
+#include <stdint.h>
+#include "list.h"
+#include "img-remote.h"
+#include <pthread.h>
+#include <semaphore.h>
+
+#define DEFAULT_LISTEN 50
+#define PAGESIZE 4096
+#define BUF_SIZE PAGESIZE
+
+struct rbuf {
+	char buffer[BUF_SIZE];
+	int nbytes; /* How many bytes are in the buffer. */
+	struct list_head l;
+};
+
+struct rimage {
+	char path[PATHLEN];
+	char snapshot_id[PATHLEN];
+	struct list_head l;
+	struct list_head buf_head;
+	/* Used to track already sent buffers when the image is appended. */
+	struct rbuf *curr_sent_buf;
+	/* Similar to the previous field. Number of bytes sent in 'curr_sent_buf'. */
+	int curr_sent_bytes;
+	uint64_t size; /* number of bytes */
+	pthread_mutex_t in_use; /* Only one operation at a time, per image. */
+};
+
+struct wthread {
+	pthread_t tid;
+	struct list_head l;
+	/* Client fd. */
+	int fd;
+	/* The path and snapshot_id identify the request handled by this thread. */
+	char path[PATHLEN];
+	char snapshot_id[PATHLEN];
+	int flags;
+	/* This semph is used to wake this thread if the image is in memory.*/
+	sem_t wakeup_sem;
+};
+
+/* This variable is used to indicate when the dump is finished. */
+extern bool finished;
+/* This is the proxy to cache TCP socket FD. */
+extern int proxy_to_cache_fd;
+
+int init_daemon(struct rimage *(*wfi)(struct wthread*));
+
+void join_workers();
+void unlock_workers();
+
+void prepare_recv_rimg();
+void finalize_recv_rimg(struct rimage *rimg);
+struct rimage *prepare_remote_image(char *path, char *namesapce, int flags);
+struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path);
+bool is_receiving();
+
+void *accept_local_image_connections(void *ptr);
+void *accept_remote_image_connections(void *ptr);
+
+uint64_t forward_image(struct rimage *rimg);
+uint64_t send_image(int fd, struct rimage *rimg, int flags, bool image_check);
+uint64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool image_check);
+
+uint64_t pb_write_obj(int fd, void *obj, int type);
+uint64_t pb_read_obj(int fd, void **obj, int type);
+
+uint64_t write_header(int fd, char *snapshot_id, char *path, int open_mode);
+uint64_t read_header(int fd, char *snapshot_id, char *path, int *open_mode);
+uint64_t write_reply_header(int fd, int error);
+uint64_t read_reply_header(int fd, int *error);
+uint64_t read_remote_header(int fd, char *snapshot_id, char *path, int *open_mode, uint64_t *size);
+uint64_t write_remote_header(int fd, char *snapshot_id, char *path, int open_mode, uint64_t size);
+
+int setup_TCP_server_socket(int port);
+int setup_TCP_client_socket(char *hostname, int port);
+int setup_UNIX_client_socket(char *path);
+int setup_UNIX_server_socket(char *path);
+#endif
+
-- 
1.9.1


-- 
Rodrigo Bruno <rbruno at gsd.inesc-id.pt>


More information about the CRIU mailing list