[CRIU] Process Migration using Sockets v3 - Patch 2/2
    Rodrigo Bruno 
    rbruno at gsd.inesc-id.pt
       
    Wed Nov  4 04:20:59 PST 2015
    
    
  
Hi, 
here goes the second part of the patch.
Main changes regarding the previous version:
- Images are forwarded from Image Proxy to Image Cache using only one TCP
connection that can be given from opts structure;
- Both Image Cache and Image Proxy only open one UNIX socket for CRIU. This
socket can be used to read, write, or append images;
- changed file names
image-proxy.c -> img-proxy.c, 
image-cache.c -> img-cache.c
image-remote-pvt.c -> img-remote-proto.c
include/image-remote-pvt.h -> include/img-remote-proto.h
- remove the use of typedefs.
I also revised all the code trying to make it more readable.
Signed-off-by: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
>From 03aacae6cf0e17a1175c3fc95767515218f1ccfd Mon Sep 17 00:00:00 2001
From: rodrigo-bruno <rbruno at gsd.inesc-id.pt>
Date: Wed, 4 Nov 2015 11:57:51 +0000
Subject: [PATCH] Process migration using sockets (2/2)
---
 Makefile                   |   1 +
 img-cache.c                | 116 +++++++
 img-proxy.c                |  92 ++++++
 img-remote-proto.c         | 735 +++++++++++++++++++++++++++++++++++++++++++++
 include/img-remote-proto.h |  69 +++++
 5 files changed, 1013 insertions(+)
 create mode 100644 img-cache.c
 create mode 100644 img-proxy.c
 create mode 100644 img-remote-proto.c
 create mode 100644 include/img-remote-proto.h
diff --git a/Makefile b/Makefile
index fdc6830..ab91f37 100644
--- a/Makefile
+++ b/Makefile
@@ -35,6 +35,7 @@ OBJCOPY		:= $(CROSS_COMPILE)objcopy
 
 CFLAGS		+= $(USERCFLAGS)
 
+DEBUG := 1
 #
 # Fetch ARCH from the uname if not yet set
 #
diff --git a/img-cache.c b/img-cache.c
new file mode 100644
index 0000000..be8361d
--- /dev/null
+++ b/img-cache.c
@@ -0,0 +1,116 @@
+#include <unistd.h>
+
+#include "img-remote.h"
+#include "img-remote-proto.h"
+#include "criu-log.h"
+#include <pthread.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <fcntl.h>
+#include "cr_options.h"
+
+/* The image cache creates a thread that calls this function. It waits for remote
+ * images from the image-cache. */
+void* accept_remote_image_connections(void* port)
+{
+	int fd = *((int*) port);
+	struct sockaddr_in cli_addr;
+	socklen_t clilen = sizeof (cli_addr);
+	char snapshot_id_buf[PATHLEN], path_buf[PATHLEN];
+	uint64_t size, ret;
+	int flags, proxy_fd;
+	struct rimage* rimg;
+
+	proxy_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
+	if (proxy_fd < 0) {
+		pr_perror("Unable to accept remote image connection from image proxy");
+		return NULL;
+	}
+	while (1) {
+		ret = read_remote_header(proxy_fd, snapshot_id_buf, path_buf, &flags, &size);
+		if (ret < 0) {
+			pr_perror("Unable to receive remote header from image proxy");
+			return NULL;
+		}
+		/* This means that the no more images are coming. */
+		else if (!ret) {
+			pr_info("Image Proxy connection closed.\n");
+			finished = true;
+			unlock_workers();
+			return NULL;
+		}
+
+		pr_info("Received %s request for %s:%s\n",
+		    flags == O_RDONLY ? "read" :
+			flags == O_APPEND ? "append" : "write",
+		    path_buf, snapshot_id_buf);
+
+		rimg = prepare_remote_image(path_buf, snapshot_id_buf, flags);
+
+		prepare_recv_rimg();
+		ret = recv_image(proxy_fd, rimg, size, flags, false);
+		if (ret < 0) {
+			pr_perror("Unable to receive %s:%s from image proxy",
+				rimg->path, rimg->snapshot_id);
+			finalize_recv_rimg(NULL);
+			return NULL;
+		}
+		else if (ret != size) {
+			pr_perror("Unable to receive %s:%s from image proxy (received %lu bytes, expected %lu bytes)",
+			    rimg->path, rimg->snapshot_id, ret, size);
+			finalize_recv_rimg(NULL);
+			return NULL;
+		}
+		finalize_recv_rimg(rimg);
+
+		pr_info("Finished receiving %s:%s (received %lu bytes)\n",
+			rimg->path, rimg->snapshot_id, ret);
+	}
+}
+
+int image_cache(unsigned short cache_write_port)
+{
+	pthread_t local_req_thr, remote_req_thr;
+	int local_req_fd, remote_req_fd;
+
+	pr_info("Proxy to Cache Port %d, CRIU to Cache Path %s\n", cache_write_port, CACHE_IMG_PATH);
+
+
+	if (opts.ps_socket != -1) {
+		remote_req_fd = opts.ps_socket;
+		pr_info("Re-using ps socket %d\n", remote_req_fd);
+	}
+	else {
+		remote_req_fd = setup_TCP_server_socket(cache_write_port);
+		if (remote_req_fd < 0) {
+			pr_perror("Unable to open proxy to cache TCP socket");
+			return -1;
+		}
+	}
+
+	local_req_fd = setup_UNIX_server_socket(READ_IMG_PATH);
+	if (local_req_fd < 0) {
+		pr_perror("Unable to open cache to proxy UNIX socket");
+		return -1;
+	}
+
+	if (init_daemon())
+		return -1;
+
+	if (pthread_create(
+	    &remote_req_thr, NULL, accept_remote_image_connections, (void*) &remote_req_fd)) {
+		pr_perror("Unable to create remote requests thread");
+		return -1;
+	}
+	if (pthread_create(
+	    &local_req_thr, NULL, accept_local_image_connections, (void*) &local_req_fd)) {
+		pr_perror("Unable to create local requests thread");
+		return -1;
+	}
+
+	join_workers();
+
+	pthread_join(remote_req_thr, NULL);
+	pthread_join(local_req_thr, NULL);
+	return 0;
+}
diff --git a/img-proxy.c b/img-proxy.c
new file mode 100644
index 0000000..f4812ee
--- /dev/null
+++ b/img-proxy.c
@@ -0,0 +1,92 @@
+#include <unistd.h>
+
+#include "img-remote.h"
+#include "img-remote-proto.h"
+#include "criu-log.h"
+#include <pthread.h>
+#include <fcntl.h>
+#include "cr_options.h"
+
+int proxy_to_cache_fd;
+
+uint64_t forward_image(struct rimage* rimg)
+{
+	uint64_t ret;
+	int fd = proxy_to_cache_fd;
+
+	if (!strncmp(rimg->path, DUMP_FINISH, sizeof (DUMP_FINISH))) {
+		finished = true;
+		return 0;
+	}
+
+	pthread_mutex_lock(&(rimg->in_use));
+	if (write_remote_header(
+		fd, rimg->snapshot_id, rimg->path, O_APPEND, rimg->size) < 0) {
+		pr_perror("Error writing header for %s:%s",
+			rimg->path, rimg->snapshot_id);
+		pthread_mutex_unlock(&(rimg->in_use));
+		return -1;
+	}
+
+	ret = send_image(fd, rimg, O_APPEND, false);
+	if (ret < 0) {
+		pr_perror("Unable to send %s:%s to image cache", 
+			rimg->path, rimg->snapshot_id);
+		pthread_mutex_unlock(&(rimg->in_use));
+		return -1;
+	}
+	else if (ret != rimg->size) {
+		pr_perror("Unable to send %s:%s to image proxy (sent %lu bytes, expected %lu bytes",
+		    rimg->path, rimg->snapshot_id, ret, rimg->size);
+		pthread_mutex_unlock(&(rimg->in_use));
+		return -1;
+	}
+	pthread_mutex_unlock(&(rimg->in_use));
+
+	pr_info("Finished forwarding %s:%s (sent %lu bytes)\n", 
+	    rimg->path, rimg->snapshot_id, rimg->size);
+	return ret;
+}
+
+int image_proxy(char* fwd_host, unsigned short fwd_port)
+{
+	pthread_t local_req_thr;
+	int local_req_fd;
+
+	pr_info("CRIU to Proxy Path: %s, Cache Address %s:%hu\n",
+		PROXY_IMG_PATH, fwd_host, fwd_port);
+
+	local_req_fd = setup_UNIX_server_socket(PROXY_IMG_PATH);
+	if (local_req_fd < 0) {
+		pr_perror("Unable to open CRIU to proxy UNIX socket");
+		return -1;
+	}
+	
+	if (opts.ps_socket != -1) {
+		proxy_to_cache_fd = opts.ps_socket;
+		pr_info("Re-using ps socket %d\n", proxy_to_cache_fd);
+	}
+	else {
+		proxy_to_cache_fd = setup_TCP_client_socket(fwd_host, fwd_port);
+		if (proxy_to_cache_fd < 0) {
+			pr_perror("Unable to open proxy to cache TCP socket");
+			return -1;
+		}
+	}
+	
+	
+
+	if(init_daemon())
+		return -1;
+
+	if (pthread_create(
+	    &local_req_thr, NULL, accept_local_image_connections, (void*) &local_req_fd)) {
+		pr_perror("Unable to create local requests thread");
+		return -1;
+	}
+
+	join_workers();
+
+	pthread_join(local_req_thr, NULL);
+	return 0;
+}
diff --git a/img-remote-proto.c b/img-remote-proto.c
new file mode 100644
index 0000000..e9dd16e
--- /dev/null
+++ b/img-remote-proto.c
@@ -0,0 +1,735 @@
+#include <unistd.h>
+#include <stdlib.h>
+
+#include <semaphore.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include "sys/un.h"
+#include <pthread.h>
+#include <fcntl.h>
+
+#include "img-remote-proto.h"
+#include "criu-log.h"
+
+#include "protobuf.h"
+#include "protobuf/remote-image.pb-c.h"
+#include "image.h"
+
+typedef struct wthread {
+	pthread_t tid;
+	struct list_head l;
+	/* Client fd. */
+	int fd;
+	/* The path and snapshot_id identify the request handled by this thread. */
+	char path[PATHLEN];
+	char snapshot_id[PATHLEN];
+	int flags;
+	/* This semph is used to wake this thread if the image is in memory.*/
+	sem_t wakeup_sem;
+} worker_thread;
+
+LIST_HEAD(rimg_head);
+pthread_mutex_t rimg_lock;
+
+pthread_mutex_t proxy_to_cache_lock;
+
+LIST_HEAD(workers_head);
+pthread_mutex_t workers_lock;
+sem_t workers_semph;
+
+bool finished = false;
+int writing = 0;
+int forwarding = 0;
+
+static struct rimage* get_rimg_by_name(const char* snapshot_id, const char* path)
+{
+	struct rimage* rimg = NULL;
+	pthread_mutex_lock(&rimg_lock);
+	list_for_each_entry(rimg, &rimg_head, l) {
+		if( !strncmp(rimg->path, path, PATHLEN) &&
+		    !strncmp(rimg->snapshot_id, snapshot_id, PATHLEN)) {
+			pthread_mutex_unlock(&rimg_lock);
+			return rimg;
+		}
+	}
+	pthread_mutex_unlock(&rimg_lock);
+	return NULL;
+}
+
+static worker_thread* get_wt_by_name(const char* snapshot_id, const char* path)
+{
+	worker_thread* wt = NULL;
+	pthread_mutex_lock(&workers_lock);
+	list_for_each_entry(wt, &workers_head, l) {
+		if( !strncmp(wt->path, path, PATHLEN) &&
+		    !strncmp(wt->snapshot_id, snapshot_id, PATHLEN)) {
+			pthread_mutex_unlock(&workers_lock);
+			return wt;
+		}
+	}
+	pthread_mutex_unlock(&workers_lock);
+	return NULL;
+}
+
+static int init_sync_structures()
+{
+	if (pthread_mutex_init(&rimg_lock, NULL) != 0) {
+		pr_perror("Remote image list mutex init failed");
+		return -1;
+	}
+
+	if (pthread_mutex_init(&proxy_to_cache_lock, NULL) != 0) {
+		pr_perror("Remote image connection mutex init failed");
+		return -1;
+	}
+	
+	if (pthread_mutex_init(&workers_lock, NULL) != 0) {
+		pr_perror("Workers mutex init failed");
+		return -1;
+	}
+
+	if (sem_init(&workers_semph, 0, 0) != 0) {
+		pr_perror("Workers semaphore init failed");
+		return -1;
+	}
+
+	return 0;
+}
+
+void prepare_recv_rimg()
+{
+	pthread_mutex_lock(&rimg_lock);
+	writing++;
+	pthread_mutex_unlock(&rimg_lock);
+}
+
+void finalize_recv_rimg(struct rimage* rimg)
+{
+	pthread_mutex_lock(&rimg_lock);
+	if (rimg)
+		list_add_tail(&(rimg->l), &rimg_head);
+	writing--;
+	pthread_mutex_unlock(&rimg_lock);
+	/* Wake thread waiting for this image. */
+	if (rimg) {
+	    worker_thread* wt = get_wt_by_name(rimg->snapshot_id, rimg->path);
+	    if(wt)
+		    sem_post(&(wt->wakeup_sem));
+	}
+}
+
+static bool is_receiving()
+{
+    int ret;
+    pthread_mutex_lock(&rimg_lock);
+    ret = writing;
+    pthread_mutex_unlock(&rimg_lock);
+    return ret > 0;
+}
+
+static void prepare_fwd_rimg()
+{
+	pthread_mutex_lock(&rimg_lock);
+	forwarding++;
+	pthread_mutex_unlock(&rimg_lock);  
+}
+
+static void finalize_fwd_rimg()
+{
+	pthread_mutex_lock(&rimg_lock);
+	forwarding--;
+	pthread_mutex_unlock(&rimg_lock);  
+}
+
+static bool is_forwarding()
+{
+    int ret;
+    pthread_mutex_lock(&rimg_lock);
+    ret = forwarding;
+    pthread_mutex_unlock(&rimg_lock);
+    return ret > 0;    
+}
+
+/* This function is called when no more images are coming. Threads still waiting
+ * for images will be awaken to send a ENOENT (no such file) to the requester. */
+void unlock_workers()
+{
+	worker_thread* wt = NULL;
+	pthread_mutex_lock(&workers_lock);
+	list_for_each_entry(wt, &workers_head, l)
+		sem_post(&(wt->wakeup_sem));
+	pthread_mutex_unlock(&workers_lock);    
+}
+
+int init_daemon()
+{
+	return init_sync_structures();
+}
+
+int setup_TCP_server_socket(int port)
+{
+	struct sockaddr_in serv_addr;
+	int sockopt = 1;
+
+	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sockfd < 0) {
+		pr_perror("Unable to open image socket");
+		return -1;
+	}
+
+	bzero((char *) &serv_addr, sizeof (serv_addr));
+	serv_addr.sin_family = AF_INET;
+	serv_addr.sin_addr.s_addr = INADDR_ANY;
+	serv_addr.sin_port = htons(port);
+
+	if (setsockopt(
+	    sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof (sockopt)) == -1) {
+		pr_perror("Unable to set SO_REUSEADDR");
+		return -1;
+	}
+
+	if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) < 0) {
+		pr_perror("Unable to bind image socket");
+		return -1;
+	}
+
+	if (listen(sockfd, DEFAULT_LISTEN)) {
+		pr_perror("Unable to listen image socket");
+		return -1;
+	}
+
+	return sockfd;
+}
+
+int setup_TCP_client_socket(char* hostname, int port)
+{
+	int sockfd;
+	struct sockaddr_in serv_addr;
+	struct hostent *server;
+
+	sockfd = socket(AF_INET, SOCK_STREAM, 0);
+	if (sockfd < 0) {
+		pr_perror("Unable to open remote image socket");
+		return -1;
+	}
+
+	server = gethostbyname(hostname);
+	if (server == NULL) {
+		pr_perror("Unable to get host by name (%s)", hostname);
+		return -1;
+	}
+
+	bzero((char *) &serv_addr, sizeof (serv_addr));
+	serv_addr.sin_family = AF_INET;
+	bcopy((char *) server->h_addr,
+	      (char *) &serv_addr.sin_addr.s_addr,
+	      server->h_length);
+	serv_addr.sin_port = htons(port);
+
+	if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+		pr_perror("Unable to connect to remote %s", hostname);
+		return -1;
+	}
+
+	return sockfd;
+}
+
+int setup_UNIX_server_socket(char* path)
+{
+	struct sockaddr_un addr;
+	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if(sockfd < 0) {
+		pr_perror("Unable to open image socket");
+		return -1;
+	}
+
+	memset(&addr, 0, sizeof(addr));
+	addr.sun_family = AF_UNIX;
+	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
+
+	unlink(path);
+
+	if (bind(sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
+		pr_perror("Unable to bind image socket");
+		return -1;
+	}
+
+	if (listen(sockfd, 50) == -1) {
+		pr_perror("Unable to listen image socket");
+		return -1;
+	}
+
+	return sockfd;
+}
+
+int setup_UNIX_client_socket(char* path)
+{
+	struct sockaddr_un addr;
+	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (sockfd < 0) {
+		pr_perror("Unable to open local image socket");
+		return -1;
+	}
+
+	memset(&addr, 0, sizeof(addr));
+	addr.sun_family = AF_UNIX;
+	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
+
+	if (connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
+		pr_perror("Unable to connect to local socket: %s", path);
+		return -1;
+	}
+
+	return sockfd;
+}
+
+uint64_t pb_write_obj(int fd, void* obj, int type)
+{
+	struct cr_img img;
+	img._x.fd = fd;
+	bfd_setraw(&img._x);
+	return pb_write_one(&img, obj, type);
+}
+
+uint64_t pb_read_obj(int fd, void** pobj, int type)
+{
+	struct cr_img img;
+	img._x.fd = fd;
+	bfd_setraw(&img._x);
+	return do_pb_read_one(&img, pobj, type, true);
+}
+
+uint64_t write_header(int fd, char* snapshot_id, char* path, int flags)
+{
+	LocalImageEntry li = LOCAL_IMAGE_ENTRY__INIT;
+	li.name = path;
+	li.snapshot_id = snapshot_id;
+	li.open_mode = flags;
+	return pb_write_obj(fd, &li, PB_LOCAL_IMAGE);
+}
+
+uint64_t write_reply_header(int fd, int error)
+{
+	LocalImageReplyEntry lir = LOCAL_IMAGE_REPLY_ENTRY__INIT;
+	lir.error = error;
+	return pb_write_obj(fd, &lir, PB_LOCAL_IMAGE_REPLY);
+}
+
+uint64_t write_remote_header(int fd, char* snapshot_id, char* path, int flags, uint64_t size)
+{
+	RemoteImageEntry ri = REMOTE_IMAGE_ENTRY__INIT;
+	ri.name = path;
+	ri.snapshot_id = snapshot_id;
+	ri.open_mode = flags;
+	ri.size = size;
+	return pb_write_obj(fd, &ri, PB_REMOTE_IMAGE);
+}
+
+uint64_t read_header(int fd, char* snapshot_id, char* path, int* flags)
+{
+	LocalImageEntry* li;
+	int ret = pb_read_obj(fd, (void**)&li, PB_LOCAL_IMAGE);
+	if (ret > 0) {
+		strncpy(snapshot_id, li->snapshot_id, PATHLEN);
+		strncpy(path, li->name, PATHLEN);
+		*flags = li->open_mode;
+	}
+	free(li);
+	return ret;
+}
+
+uint64_t read_reply_header(int fd, int* error)
+{
+    LocalImageReplyEntry* lir;
+    int ret = pb_read_obj(fd, (void**)&lir, PB_LOCAL_IMAGE_REPLY);
+    if (ret > 0) {
+	    *error = lir->error;
+    }
+    free(lir);
+    return ret;
+}
+
+uint64_t read_remote_header(int fd, char* snapshot_id, char* path, int* flags, uint64_t* size)
+{
+	RemoteImageEntry* ri;
+	int ret = pb_read_obj(fd, (void**)&ri, PB_REMOTE_IMAGE);
+	if (ret > 0) {
+		strncpy(snapshot_id, ri->snapshot_id, PATHLEN);
+		strncpy(path, ri->name, PATHLEN);
+		*flags = ri->open_mode;
+		*size = ri->size;
+	}
+	free(ri);
+	return ret;
+}
+
+static worker_thread* new_worker()
+{
+	worker_thread* wt = malloc(sizeof(worker_thread));
+	if(!wt) {
+		pr_perror("Unable to allocate worker thread structure");
+	}
+	if (sem_init(&(wt->wakeup_sem), 0, 0) != 0) {
+		pr_perror("Workers semaphore init failed");
+		return NULL;
+	}      
+	return wt;
+}
+
+static void add_worker(worker_thread* wt)
+{
+	pthread_mutex_lock(&workers_lock);
+	list_add_tail(&(wt->l), &workers_head);
+	pthread_mutex_unlock(&workers_lock);
+
+	sem_post(&workers_semph);
+}
+
+void join_workers()
+{
+	worker_thread* wthread = NULL;
+	while (1) {
+	    if (list_empty(&workers_head)) {
+		    sem_wait(&workers_semph);
+		    continue;
+	    }
+	    wthread = list_entry(workers_head.next, worker_thread, l);
+	    if (pthread_join(wthread->tid, NULL)) {
+		    pr_perror("Could not join thread %lu", (unsigned long) wthread->tid);
+	    }
+	    else {
+		    list_del(&(wthread->l));
+		    free(wthread);
+	    }
+	}
+}
+
+static struct rimage* new_remote_image(char* path, char* snapshot_id)
+{
+	struct rimage* rimg = malloc(sizeof (struct rimage));
+	if (rimg == NULL) {
+		pr_perror("Unable to allocate remote_image structures");
+		return NULL;
+	}
+
+	struct rbuf* buf = malloc(sizeof (struct rbuf));
+	if (buf == NULL) {
+		pr_perror("Unable to allocate remote_buffer structures");
+		return NULL;
+	}
+
+	strncpy(rimg->path, path, PATHLEN);
+	strncpy(rimg->snapshot_id, snapshot_id, PATHLEN);
+	rimg->size = 0;
+	buf->nbytes = 0;
+	INIT_LIST_HEAD(&(rimg->buf_head));
+	list_add_tail(&(buf->l), &(rimg->buf_head));
+	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	rimg->curr_sent_bytes = 0;
+	
+	if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
+		pr_perror("Remote image in_use mutex init failed");
+		return NULL;
+	}
+	return rimg;
+}
+
+/* Clears a remote image struct for reusing it. */
+static struct rimage* clear_remote_image(struct rimage* rimg)
+{       
+	pthread_mutex_lock(&(rimg->in_use));
+	
+	while (!list_is_singular(&(rimg->buf_head))) {
+		struct rbuf* buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+		list_del(rimg->buf_head.prev);
+		free(buf);
+	}
+	
+	list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
+	rimg->size = 0;
+	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	rimg->curr_sent_bytes = 0;        
+	
+	pthread_mutex_unlock(&(rimg->in_use));
+
+	return rimg;
+}
+
+struct rimage* prepare_remote_image(char* path, char* snapshot_id, int open_mode)
+{
+	struct rimage* rimg = get_rimg_by_name(snapshot_id, path);
+	/* There is no record of such image, create a new one. */
+	if (rimg == NULL)
+		return new_remote_image(path, snapshot_id);
+	
+	pthread_mutex_lock(&rimg_lock);
+	list_del(&(rimg->l));
+	pthread_mutex_unlock(&rimg_lock);
+	
+	/* There is already an image record. Simply return it for appending. */
+	if (open_mode == O_APPEND)
+		return rimg;
+	/* There is already an image record. Clear it for writing. */
+	else
+		return clear_remote_image(rimg);
+}
+
+static struct rimage* wait_for_image(worker_thread* wt)
+{
+	struct rimage *result;
+	result = get_rimg_by_name(wt->snapshot_id, wt->path);
+	if (result != NULL) {
+		if (write_reply_header(wt->fd, 0) < 0) {
+			pr_perror("Error writing reply header for %s:%s", 
+			    wt->path, wt->snapshot_id);
+			close(wt->fd);
+			return NULL;
+		}
+		return result;
+	}
+
+	/* The file does not exist but the request is for a parent file.
+	 * A parent file may not exist for the first process. */
+	if (!is_receiving() && !strncmp(wt->path, PARENT_IMG, PATHLEN)) {
+		pr_info("Finished sending empty %s:%s to CRIU\n", 
+			    wt->path, wt->snapshot_id);
+		if (write_reply_header(wt->fd, 0) < 0) {
+			pr_perror("Error writing header for %s:%s",
+				    wt->path, wt->snapshot_id);
+		}
+		close(wt->fd);
+		return NULL;
+	}
+	/* The file does not exist and we do not expect new files */
+	if (finished && !is_receiving()) {
+	    pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
+		if (write_reply_header(wt->fd, ENOENT) < 0) {
+			pr_perror("Error writing reply header for unexisting image");
+		}
+		close(wt->fd);
+		return NULL;
+	}
+	/* NOTE: at this point, when the thread wakes up, either the image is
+	 * already in memory or it will never come (the dump is finished). */
+	sem_wait(&(wt->wakeup_sem));
+	result = get_rimg_by_name(wt->snapshot_id, wt->path);
+	if (result != NULL) {
+		if (write_reply_header(wt->fd, 0) < 0) {
+			pr_perror("Error writing reply header for %s:%s", 
+			    wt->path, wt->snapshot_id);
+			close(wt->fd);
+			return NULL;
+		}
+		return result;
+	}
+	else {
+		pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
+		if (write_reply_header(wt->fd, ENOENT) < 0) {
+			    pr_perror("Error writing reply header for unexisting image");
+		}
+		close(wt->fd);
+		return NULL;
+	}
+}
+
+static void* process_local_image_connection(void* ptr)
+{
+	worker_thread* wt = (worker_thread*) ptr;
+	struct rimage* rimg = NULL;
+	uint64_t ret;
+
+	/* NOTE: the code inside this if is shared for both cache and proxy. */
+	if (wt->flags == O_RDONLY) {
+		rimg = wait_for_image(wt);
+		if (!rimg)
+			return NULL;
+		
+		pthread_mutex_lock(&(rimg->in_use));
+		ret = send_image(wt->fd, rimg, wt->flags, true);
+		if (ret < 0)
+			pr_perror("Unable to send %s:%s to CRIU (sent %lu bytes)", 
+			    rimg->path, rimg->snapshot_id, ret);
+		else
+			pr_info("Finished sending %s:%s to CRIU (sent %lu bytes)\n", 
+			    rimg->path, rimg->snapshot_id, ret);
+		pthread_mutex_unlock(&(rimg->in_use));
+	}
+	/* NOTE: IMAGE PROXY ONLY. The image cache receives write connections 
+	 * through TCP (see accept_remote_image_connections). */
+	else {  
+		rimg = prepare_remote_image(wt->path, wt->snapshot_id, wt->flags); 
+		ret = recv_image(wt->fd, rimg, 0, wt->flags, true);
+		if (ret < 0) {
+			pr_perror("Unable to receive %s:%s to CRIU (received %lu bytes)", 
+			    rimg->path, rimg->snapshot_id, ret);
+			finalize_recv_rimg(NULL);
+			return NULL;
+		}
+		finalize_recv_rimg(rimg);
+		pr_info("Finished receiving %s:%s (received %lu bytes)\n", 
+			rimg->path, rimg->snapshot_id, ret);
+		
+		pthread_mutex_lock(&proxy_to_cache_lock);
+		ret = forward_image(rimg);
+		pthread_mutex_unlock(&proxy_to_cache_lock);
+		finalize_fwd_rimg();
+		if (ret < 0) {
+		    pr_perror("Unable to forward %s:%s to Image Cache", 
+			    rimg->path, rimg->snapshot_id);
+		    
+		    return NULL;
+		}
+		
+		if (finished && !is_forwarding() && !is_receiving()) {
+		    pr_info("Closing connection to Image Cache.\n");
+		    close(proxy_to_cache_fd);
+		    unlock_workers();
+		}
+	}
+	return NULL;
+}
+
+
+void* accept_local_image_connections(void* port)
+{
+	int fd = *((int*) port);
+	int cli_fd;
+	struct sockaddr_in cli_addr;
+	socklen_t clilen = sizeof (cli_addr);
+	pthread_t tid;
+	worker_thread* wt;
+       
+	while (1) {
+		cli_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
+		if (cli_fd < 0) {
+			pr_perror("Unable to accept local image connection");
+			return NULL;
+		}
+
+		wt = new_worker();
+		wt->fd = cli_fd;
+		
+		if (read_header(wt->fd, wt->snapshot_id, wt->path, &(wt->flags)) < 0) {
+			pr_perror("Error reading local image header");
+			return NULL;
+		}
+
+		pr_info("Received %s request for %s:%s\n", 
+		    wt->flags == O_RDONLY ? "read" : 
+			wt->flags == O_APPEND ? "append" : "write", 
+		    wt->path, wt->snapshot_id);
+		
+		/* These function calls are used to avoid other threads from 
+		 * thinking that there are no more images are coming. */
+		if(wt->flags != O_RDONLY) {
+			prepare_recv_rimg();
+			prepare_fwd_rimg();
+		}
+		
+		if (pthread_create(
+		    &tid, NULL, process_local_image_connection, (void*) wt)) {
+			pr_perror("Unable to create worker thread");
+			return NULL;
+		}
+		wt->tid = tid;
+		add_worker(wt);
+	}
+}
+
+// Note: size is a limit on how much we want to read from the socket. Zero means
+// read until the socket is closed.
+uint64_t recv_image(int fd, struct rimage* rimg, uint64_t size, int flags, bool close_fd)
+{
+	struct rbuf* curr_buf = NULL;
+	int n;
+
+	if (flags == O_APPEND)
+		curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+	else
+		curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+	
+	while(1) {
+		n = read(fd,
+			 curr_buf->buffer + curr_buf->nbytes,
+			 size ? 
+			     MIN(size - rimg->size, BUF_SIZE - curr_buf->nbytes) : 
+			     BUF_SIZE - curr_buf->nbytes);
+		if (n == 0) {
+			if (close_fd)
+				close(fd);
+			return rimg->size;
+		}
+		else if (n > 0) {
+			curr_buf->nbytes += n;
+			rimg->size += n;
+			if(curr_buf->nbytes == BUF_SIZE) {
+				struct rbuf* buf = malloc(sizeof(struct rbuf));
+				if (buf == NULL) {
+					pr_perror("Unable to allocate remote_buffer structures");
+					if (close_fd)
+						close(fd);
+					return -1;
+				}
+				buf->nbytes = 0;
+				list_add_tail(&(buf->l), &(rimg->buf_head));
+				curr_buf = buf;
+			}
+			if (size && rimg->size == size) {
+				if (close_fd)
+					close(fd);
+				return rimg->size;
+			}
+		}
+		else {
+			pr_perror("Read on %s:%s socket failed", 
+				rimg->path, rimg->snapshot_id);
+			if (close_fd)
+				close(fd);
+			return -1;
+		}
+	}
+}
+
+uint64_t send_image(int fd, struct rimage* rimg, int flags, bool close_fd)
+{
+	
+	int n, nblocks = 0;
+	
+	if (flags != O_APPEND) {
+		rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+		rimg->curr_sent_bytes = 0;
+	}
+	
+	while(1) {
+		n = send(
+		    fd,
+		    rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
+		    MIN(BUF_SIZE, rimg->curr_sent_buf->nbytes) - rimg->curr_sent_bytes,
+		    MSG_NOSIGNAL);
+		if (n > -1) {
+			rimg->curr_sent_bytes += n;
+			if (rimg->curr_sent_bytes == BUF_SIZE) {
+				rimg->curr_sent_buf =
+				    list_entry(rimg->curr_sent_buf->l.next, struct rbuf, l);
+				nblocks++;
+				rimg->curr_sent_bytes = 0;
+			}
+			else if (rimg->curr_sent_bytes == rimg->curr_sent_buf->nbytes) {
+				if (close_fd)
+					close(fd);
+			       return nblocks*BUF_SIZE + rimg->curr_sent_buf->nbytes;
+			}
+		}
+		else if (errno == EPIPE || errno == ECONNRESET) {
+			pr_warn("Connection for %s:%s was closed early than expected\n",
+				rimg->path, rimg->snapshot_id);
+			return 0;
+		}
+		else {
+			pr_perror("Write on %s:%s socket failed", 
+				rimg->path, rimg->snapshot_id);
+			return -1;
+		}
+	}
+}
diff --git a/include/img-remote-proto.h b/include/img-remote-proto.h
new file mode 100644
index 0000000..638c5a7
--- /dev/null
+++ b/include/img-remote-proto.h
@@ -0,0 +1,69 @@
+#ifndef IMAGE_REMOTE_PVT_H
+#define	IMAGE_REMOTE_PVT_H
+
+#include <stdbool.h>
+#include <stdint.h>
+#include "list.h"
+#include "img-remote.h"
+#include <pthread.h>
+#include <semaphore.h>
+
+#define DEFAULT_LISTEN 50
+#define PAGESIZE 4096
+#define BUF_SIZE PAGESIZE
+
+struct rbuf {
+	char buffer[BUF_SIZE];
+	int nbytes; /* How many bytes are in the buffer. */
+	struct list_head l;
+};
+
+struct rimage {
+	char path[PATHLEN];
+	char snapshot_id[PATHLEN];
+	struct list_head l;
+	struct list_head buf_head;
+	/* Used to track already sent buffers when the image is appended. */
+	struct rbuf* curr_sent_buf; 
+	/* Similar to the previous field. Number of bytes sent in 'curr_sent_buf'. */
+	int curr_sent_bytes;
+	uint64_t size; /* number of bytes */
+	pthread_mutex_t in_use; /* Only one operation at a time, per image. */
+};
+
+/* This variable is used to indicate when the dump is finished. */
+extern bool finished;
+/* This is the proxy to cache TCP socket FD. */
+extern int proxy_to_cache_fd;
+
+int init_daemon();
+
+void join_workers();
+void unlock_workers();
+
+void prepare_recv_rimg();
+void finalize_recv_rimg(struct rimage* rimg);
+struct rimage* prepare_remote_image(char* path, char* namesapce, int flags);
+
+void* accept_local_image_connections(void* ptr);
+void* accept_remote_image_connections(void* ptr);
+
+uint64_t forward_image(struct rimage* rimg);
+uint64_t send_image(int fd, struct rimage* rimg, int flags, bool image_check);
+uint64_t recv_image(int fd, struct rimage* rimg, uint64_t size, int flags, bool image_check);
+
+uint64_t pb_write_obj(int fd, void* obj, int type);
+uint64_t pb_read_obj(int fd, void** obj, int type);
+
+uint64_t write_header(int fd, char* snapshot_id, char* path, int open_mode);
+uint64_t read_header(int fd, char* snapshot_id, char* path, int* open_mode);
+uint64_t write_reply_header(int fd, int error);
+uint64_t read_reply_header(int fd, int* error);
+uint64_t read_remote_header(int fd, char* snapshot_id, char* path, int* open_mode, uint64_t* size);
+uint64_t write_remote_header(int fd, char* snapshot_id, char* path, int open_mode, uint64_t size);
+
+int setup_TCP_server_socket(int port);
+int setup_TCP_client_socket(char* hostname, int port);
+int setup_UNIX_client_socket(char* path);
+int setup_UNIX_server_socket(char* path);
+#endif
-- 
1.9.1
-- 
Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
    
    
More information about the CRIU
mailing list