[CRIU] Process Migration using Sockets - PATCH 2/2

Rodrigo Bruno rbruno at gsd.inesc-id.pt
Sat Oct 3 12:41:35 PDT 2015


Hi,

On Wed, 30 Sep 2015 22:32:48 +0300
Pavel Emelyanov <xemul at parallels.com> wrote:

> On 09/27/2015 01:47 AM, Rodrigo Bruno wrote:
> > Hi, 
> > 
> > here goes the second part.
> 
> Same wish here -- a big descriptive comment of what's going on is required.
> Plus more comments inline.

as for the first patch, I will prepare the code and resend both patches.

> 
> > Signed-off-by: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
> > 
> >>From 954956d6b2fa583171b79afb99468f6acd356efb Mon Sep 17 00:00:00 2001
> > From: rbruno <rbruno at gsd.inesc-id.pt>
> > Date: Sat, 26 Sep 2015 23:32:37 +0100
> > Subject: [PATCH] Criu remote migration using sockets. Patch 2/2.
> > 
> > ---
> >  Makefile.crtools           |   3 +
> >  crtools.c                  |  20 +-
> >  image-cache.c              |  56 ++++++
> >  image-proxy.c              |  74 +++++++
> >  image-remote-pvt.c         | 483 +++++++++++++++++++++++++++++++++++++++++++++
> >  include/image-remote-pvt.h |  53 +++++
> >  6 files changed, 687 insertions(+), 2 deletions(-)
> >  create mode 100644 image-cache.c
> >  create mode 100644 image-proxy.c
> >  create mode 100644 image-remote-pvt.c
> >  create mode 100644 include/image-remote-pvt.h
> > 
> > diff --git a/Makefile.crtools b/Makefile.crtools
> > index 3667cf3..4a74fa8 100644
> > --- a/Makefile.crtools
> > +++ b/Makefile.crtools
> > @@ -7,6 +7,9 @@ obj-y	+= security.o
> >  obj-y	+= image.o
> >  obj-y	+= image-desc.o
> >  obj-y	+= image-remote.o
> > +obj-y	+= image-proxy.o
> > +obj-y	+= image-cache.o
> > +obj-y	+= image-remote-pvt.o
> >  obj-y	+= net.o
> >  obj-y	+= tun.o
> >  obj-y	+= proc_parse.o
> > diff --git a/crtools.c b/crtools.c
> > index bb92ff3..70d4b8f 100644
> > --- a/crtools.c
> > +++ b/crtools.c
> > @@ -43,6 +43,8 @@
> > 
> >  #include "setproctitle.h"
> > 
> > +#include "image-remote.h"
> > +
> >  struct cr_options opts;
> > 
> >  void init_opts(void)
> > @@ -62,6 +64,8 @@ void init_opts(void)
> >  	opts.cpu_cap = CPU_CAP_DEFAULT;
> >  	opts.manage_cgroups = CG_MODE_DEFAULT;
> >  	opts.ps_socket = -1;
> > +	opts.addr = PROXY_FWD_HOST;
> > +	opts.ps_port = CACHE_PUT_PORT;
> >  	opts.ghost_limit = DEFAULT_GHOST_LIMIT;
> >  }
> > 
> > @@ -252,7 +256,9 @@ int main(int argc, char *argv[], char *envp[])
> >  		{ "freeze-cgroup",		required_argument,	0, 1068 },
> >  		{ "ghost-limit",		required_argument,	0, 1069 },
> >  		{ "irmap-scan-path",		required_argument,	0, 1070 },
> > -		{ "remote",			no_argument,		0, 1071 },
> > +		{ "remote",			no_argument,		0, 1071 },
> > +		{ "image-cache",		no_argument,		0, 1072 },
> > +		{ "image-proxy",		required_argument,	0, 1073 },
> 
> These options are not handles by the below switch().
> 
> >  		{ },
> >  	};
> > 
> > @@ -646,6 +652,12 @@ int main(int argc, char *argv[], char *envp[])
> >  	if (!strcmp(argv[optind], "page-server"))
> >  		return cr_page_server(opts.daemon_mode, -1) > 0 ? 0 : 1;
> > 
> > +	if (!strcmp(argv[optind], "image-cache"))
> > +		return image_cache(opts.ps_port);
> > +
> > +	if (!strcmp(argv[optind], "image-proxy"))
> > +		return image_proxy(opts.addr, opts.ps_port);
> > +
> >  	if (!strcmp(argv[optind], "service"))
> >  		return cr_service(opts.daemon_mode);
> > 
> > @@ -672,6 +684,8 @@ usage:
> >  "  criu page-server\n"
> >  "  criu service [<options>]\n"
> >  "  criu dedup\n"
> > +"  criu image-cache [<options>]\n"
> > +"  criu image-proxy [<options>]\n"
> >  "\n"
> >  "Commands:\n"
> >  "  dump           checkpoint a process/tree identified by pid\n"
> > @@ -684,6 +698,8 @@ usage:
> >  "  dedup          remove duplicates in memory dump\n"
> >  "  cpuinfo dump   writes cpu information into image file\n"
> >  "  cpuinfo check  validates cpu information read from image file\n"
> > +"  image-cache    launch image-cache, used for process live migration\n"
> > +"  image-proxy    launch image-proxy, used for process live migration\n"
> >  	);
> > 
> >  	if (usage_error) {
> > @@ -766,7 +782,7 @@ usage:
> >  "                        when used on restore, as soon as page is restored, it\n"
> >  "                        will be punched from the image.\n"
> >  "\n"
> > -"Page/Service server options:\n"
> > +"Page/Service/image-cache/image-proxy server options:\n"
> >  "  --address ADDR        address of server or service\n"
> >  "  --port PORT           port of page server\n"
> >  "  -d|--daemon           run in the background after creating socket\n"
> > diff --git a/image-cache.c b/image-cache.c
> > new file mode 100644
> > index 0000000..84ffbe4
> > --- /dev/null
> > +++ b/image-cache.c
> > @@ -0,0 +1,56 @@
> > +#include <unistd.h>
> > +
> > +#include "image-remote.h"
> > +#include "image-remote-pvt.h"
> > +#include "criu-log.h"
> > +#include <pthread.h>
> > +
> > +void* cache_remote_image(void* ptr)
> 
> This is better to be static and passed into init() as pointer.
> 
> > +{
> > +	remote_image* rimg = (remote_image*) ptr;
> > +
> > +	if (!strncmp(rimg->path, DUMP_FINISH, sizeof (DUMP_FINISH)))
> > +	{
> > +		close(rimg->src_fd);
> > +		return NULL;
> > +	}
> > +
> > +	prepare_put_rimg();
> > +
> > +	recv_remote_image(rimg->src_fd, rimg->path, &rimg->buf_head, true);
> > +
> > +	finalize_put_rimg(rimg);
> > +
> > +	return NULL;
> > +}
> > +
> > +int image_cache(unsigned short cache_put_port)
> > +{
> > +	pthread_t get_thr, put_thr;
> > +	int put_fd, get_fd;
> > +
> > +	pr_info("Put Port %d, Get Port %d\n", cache_put_port, CACHE_GET_PORT);
> > +
> > +	put_fd = prepare_server_socket(cache_put_port);
> > +	get_fd = prepare_server_socket(CACHE_GET_PORT);
> 
> Please, describe what direction is put and what is get.
> 
> > +
> > +	if (init_cache())
> > +		return -1;
> > +
> > +	if (pthread_create(
> > +	    &put_thr, NULL, accept_put_image_connections, (void*) &put_fd)) {
> > +		pr_perror("Unable to create put thread");
> > +		return -1;
> > +	}
> > +	if (pthread_create(
> > +	    &get_thr, NULL, accept_get_image_connections, (void*) &get_fd)) {
> > +		pr_perror("Unable to create get thread");
> > +		return -1;
> > +	}
> > +
> > +	join_workers();
> > +
> > +	pthread_join(put_thr, NULL);
> > +	pthread_join(get_thr, NULL);
> > +	return 0;
> > +}
> > diff --git a/image-proxy.c b/image-proxy.c
> > new file mode 100644
> > index 0000000..132eb5a
> > --- /dev/null
> > +++ b/image-proxy.c
> > @@ -0,0 +1,74 @@
> > +#include <unistd.h>
> > +
> > +#include "image-remote.h"
> > +#include "image-remote-pvt.h"
> > +#include "criu-log.h"
> > +#include <pthread.h>
> > +
> > +static char* dst_host;
> > +static unsigned short dst_port;
> > +
> > +void* proxy_remote_image(void* ptr)
> 
> This also should be static.
> 
> > +{
> > +	remote_image* rimg = (remote_image*) ptr;
> > +	rimg->dst_fd = prepare_client_socket(dst_host, dst_port);
> > +	if (rimg->dst_fd < 0) {
> > +		pr_perror("Unable to open recover image socket");
> > +		return NULL;
> > +	}
> > +
> > +	if (write_header(rimg->dst_fd, rimg->namespace, rimg->path) < 0) {
> > +		pr_perror("Error writing header for %s:%s",
> > +			rimg->path, rimg->namespace);
> > +		return NULL;
> > +	}
> > +
> > +	prepare_put_rimg();
> > +
> > +	if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
> > +	    close(rimg->dst_fd);
> > +	    finalize_put_rimg(rimg);
> > +	    return NULL;
> 
> Smth happened with indentation here.
> 
> > +	}
> > +	if (recv_remote_image(rimg->src_fd, rimg->path, &(rimg->buf_head), false) < 0) {
> > +		return NULL;
> > +	}
> > +	finalize_put_rimg(rimg);
> > +	send_remote_image(rimg->dst_fd, rimg->path, &(rimg->buf_head), true);
> > +	return NULL;
> > +}
> > +
> > +int image_proxy(char* fwd_host, unsigned short fwd_port)
> > +{
> > +	pthread_t get_thr, put_thr;
> > +	int put_fd, get_fd;
> > +
> > +	dst_host = fwd_host;
> > +	dst_port = fwd_port;
> > +
> > +	pr_info("Proxy Get Port %d, Put Port %d, Destination Host %s:%hu\n",
> > +		PROXY_GET_PORT, PROXY_PUT_PORT, fwd_host, fwd_port);
> > +
> > +	put_fd = prepare_server_socket(PROXY_PUT_PORT);
> > +	get_fd = prepare_server_socket(PROXY_GET_PORT);
> > +
> > +	if(init_proxy())
> > +		return -1;
> > +
> > +	if (pthread_create(
> > +	    &put_thr, NULL, accept_put_image_connections, (void*) &put_fd)) {
> > +		pr_perror("Unable to create put thread");
> > +		return -1;
> > +	}
> > +	if (pthread_create(
> > +	    &get_thr, NULL, accept_get_image_connections, (void*) &get_fd)) {
> > +		pr_perror("Unable to create get thread");
> > +		return -1;
> > +	}
> > +
> > +	join_workers();
> > +
> > +	pthread_join(put_thr, NULL);
> > +	pthread_join(get_thr, NULL);
> > +	return 0;
> > +}
> > diff --git a/image-remote-pvt.c b/image-remote-pvt.c
> > new file mode 100644
> > index 0000000..12263d8
> > --- /dev/null
> > +++ b/image-remote-pvt.c
> 
> This file looks OK as it is, but before commenting, I'd like to see the
> description of the protocol as requested in comments to patch #1.
> 
> -- Pavel
> 
> > @@ -0,0 +1,483 @@
> > +#include <unistd.h>
> > +#include <stdlib.h>
> > +
> > +#include <semaphore.h>
> > +#include <sys/socket.h>
> > +#include <netinet/in.h>
> > +#include <netdb.h>
> > +#include <pthread.h>
> > +
> > +#include "image-remote-pvt.h"
> > +#include "criu-log.h"
> > +
> > +typedef struct wthread {
> > +    pthread_t tid;
> > +    struct list_head l;
> > +} worker_thread;
> > +
> > +static LIST_HEAD(rimg_head);
> > +static pthread_mutex_t rimg_lock;
> > +static sem_t rimg_semph;
> > +
> > +static LIST_HEAD(workers_head);
> > +static pthread_mutex_t workers_lock;
> > +static sem_t workers_semph;
> > +
> > +static int finished = 0;
> > +static int putting = 0;
> > +
> > +static void* (*get_func)(void*);
> > +static void* (*put_func)(void*);
> > +
> > +static remote_image* get_rimg_by_name(const char* namespace, const char* path)
> > +{
> > +	remote_image* rimg = NULL;
> > +	pthread_mutex_lock(&rimg_lock);
> > +	list_for_each_entry(rimg, &rimg_head, l) {
> > +		if( !strncmp(rimg->path, path, PATHLEN) &&
> > +		    !strncmp(rimg->namespace, namespace, PATHLEN)) {
> > +			pthread_mutex_unlock(&rimg_lock);
> > +			return rimg;
> > +		}
> > +	}
> > +	pthread_mutex_unlock(&rimg_lock);
> > +	return NULL;
> > +}
> > +
> > +static uint64_t sizeof_remote_buffer(struct list_head* rbuff_head)
> > +{
> > +    uint64_t res = 0;
> > +    remote_buffer* aux = NULL;
> > +    list_for_each_entry(aux, rbuff_head, l) {
> > +	res += aux->nbytes;
> > +    }
> > +    return res;
> > +}
> > +
> > +int init_sync_structures()
> > +{
> > +	if (pthread_mutex_init(&rimg_lock, NULL) != 0) {
> > +		pr_perror("Remote image connection mutex init failed");
> > +		return -1;
> > +	}
> > +
> > +	if (sem_init(&rimg_semph, 0, 0) != 0) {
> > +		pr_perror("Remote image connection semaphore init failed");
> > +		return -1;
> > +	}
> > +
> > +	if (pthread_mutex_init(&workers_lock, NULL) != 0) {
> > +		pr_perror("Workers mutex init failed");
> > +		return -1;
> > +	}
> > +
> > +	if (sem_init(&workers_semph, 0, 0) != 0) {
> > +		pr_perror("Workers semaphore init failed");
> > +		return -1;
> > +	}
> > +	return 0;
> > +}
> > +
> > +static remote_image* wait_for_image(int cli_fd, char* namespace, char* path)
> > +{
> > +	remote_image *result;
> > +
> > +	while (1) {
> > +		result = get_rimg_by_name(namespace, path);
> > +
> > +		if (result != NULL) {
> > +			if (write_header(cli_fd, namespace, path) < 0) {
> > +				pr_perror("Error writing header for %s:%s",
> > +					path, namespace);
> > +				close(cli_fd);
> > +				return NULL;
> > +			}
> > +			return result;
> > +		}
> > +		/* The file does not exist and we do not expect new files */
> > +		if (finished && !putting) {
> > +			if (write_header(cli_fd, NULL_NAMESPACE, DUMP_FINISH) < 0) {
> > +				pr_perror("Error writing header for %s:%s",
> > +					DUMP_FINISH, NULL_NAMESPACE);
> > +			}
> > +			close(cli_fd);
> > +			return NULL;
> > +		}
> > +		/* The file does not exist but the request is for a parent file.
> > +		   A parent file may not exist for the first process. */
> > +		if (!putting && !strncmp(path, PARENT_IMG, PATHLEN)) {
> > +		    if (write_header(cli_fd, namespace, path) < 0) {
> > +			    pr_perror("Error writing header for %s:%s",
> > +					path, namespace);
> > +		    }
> > +		    close(cli_fd);
> > +		    return NULL;
> > +		}
> > +		sem_wait(&rimg_semph);
> > +	}
> > +}
> > +
> > +void* get_remote_image(void* fd)
> > +{
> > +	int cli_fd = (long) fd;
> > +	remote_image* rimg = NULL;
> > +	char path_buf[PATHLEN];
> > +	char namespace_buf[PATHLEN];
> > +
> > +	if(read_header(cli_fd, namespace_buf, path_buf) < 0) {
> > +		pr_perror("Error reading header");
> > +		return NULL;
> > +	}
> > +
> > +	pr_info("Received GET for %s:%s.\n", path_buf, namespace_buf);
> > +
> > +	rimg = wait_for_image(cli_fd, namespace_buf, path_buf);
> > +	if (!rimg)
> > +		return NULL;
> > +
> > +	rimg->dst_fd = cli_fd;
> > +	send_remote_image(rimg->dst_fd, rimg->path, &rimg->buf_head, false);
> > +	return NULL;
> > +}
> > +
> > +void prepare_put_rimg()
> > +{
> > +	pthread_mutex_lock(&rimg_lock);
> > +	putting++;
> > +	pthread_mutex_unlock(&rimg_lock);
> > +}
> > +
> > +void finalize_put_rimg(remote_image* rimg)
> > +{
> > +	pthread_mutex_lock(&rimg_lock);
> > +	list_add_tail(&(rimg->l), &rimg_head);
> > +	putting--;
> > +	pthread_mutex_unlock(&rimg_lock);
> > +	sem_post(&rimg_semph);
> > +}
> > +
> > +int init_proxy()
> > +{
> > +	get_func = get_remote_image;
> > +	put_func = proxy_remote_image;
> > +	return init_sync_structures();
> > +}
> > +
> > +int init_cache()
> > +{
> > +	get_func = get_remote_image;
> > +	put_func = cache_remote_image;
> > +	return init_sync_structures();
> > +}
> > +
> > +int prepare_server_socket(int port)
> > +{
> > +	struct sockaddr_in serv_addr;
> > +	int sockopt = 1;
> > +
> > +	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
> > +	if (sockfd < 0) {
> > +		pr_perror("Unable to open image socket");
> > +		return -1;
> > +	}
> > +
> > +	bzero((char *) &serv_addr, sizeof (serv_addr));
> > +	serv_addr.sin_family = AF_INET;
> > +	serv_addr.sin_addr.s_addr = INADDR_ANY;
> > +	serv_addr.sin_port = htons(port);
> > +
> > +	if (setsockopt(
> > +	    sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof (sockopt)) == -1) {
> > +		pr_perror("Unable to set SO_REUSEADDR");
> > +		return -1;
> > +	}
> > +
> > +	if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) < 0) {
> > +		pr_perror("Unable to bind image socket");
> > +		return -1;
> > +	}
> > +
> > +	if (listen(sockfd, DEFAULT_LISTEN)) {
> > +		pr_perror("Unable to listen image socket");
> > +		return -1;
> > +	}
> > +
> > +	return sockfd;
> > +}
> > +
> > +int prepare_client_socket(char* hostname, int port)
> > +{
> > +	struct hostent *server;
> > +	struct sockaddr_in serv_addr;
> > +
> > +	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
> > +	if (sockfd < 0) {
> > +		pr_perror("Unable to open recover image socket");
> > +		return -1;
> > +	}
> > +
> > +	server = gethostbyname(hostname);
> > +	if (server == NULL) {
> > +		pr_perror("Unable to get host by name (%s)", hostname);
> > +		return -1;
> > +	}
> > +
> > +	bzero((char *) &serv_addr, sizeof (serv_addr));
> > +	serv_addr.sin_family = AF_INET;
> > +	bcopy((char *) server->h_addr,
> > +	      (char *) &serv_addr.sin_addr.s_addr,
> > +	      server->h_length);
> > +	serv_addr.sin_port = htons(port);
> > +
> > +	if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
> > +		pr_perror("Unable to connect to remote restore host %s", hostname);
> > +		return -1;
> > +	}
> > +
> > +	return sockfd;
> > +}
> > +
> > +static void add_worker(pthread_t tid)
> > +{
> > +	worker_thread* wthread = malloc(sizeof(worker_thread));
> > +	if(!wthread) {
> > +		pr_perror("Unable to allocate worker thread structure");
> > +	}
> > +	wthread->tid = tid;
> > +	pthread_mutex_lock(&workers_lock);
> > +	list_add_tail(&(wthread->l), &workers_head);
> > +	pthread_mutex_unlock(&workers_lock);
> > +	sem_post(&workers_semph);
> > +}
> > +
> > +void join_workers()
> > +{
> > +	worker_thread* wthread = NULL;
> > +	while (1) {
> > +	    if (list_empty(&workers_head)) {
> > +		    sem_wait(&workers_semph);
> > +		    continue;
> > +	    }
> > +	    wthread = list_entry(workers_head.next, worker_thread, l);
> > +	    if (pthread_join(wthread->tid, NULL)) {
> > +		    pr_perror("Could not join thread %lu", (unsigned long) wthread->tid);
> > +	    }
> > +	    else {
> > +		    list_del(&(wthread->l));
> > +		    free(wthread);
> > +	    }
> > +
> > +	}
> > +}
> > +
> > +void* accept_get_image_connections(void* port)
> > +{
> > +	socklen_t clilen;
> > +	long cli_fd;
> > +	pthread_t tid;
> > +	int get_fd = *((int*) port);
> > +	struct sockaddr_in cli_addr;
> > +	clilen = sizeof (cli_addr);
> > +
> > +	while (1) {
> > +
> > +		cli_fd = accept(get_fd, (struct sockaddr *) &cli_addr, &clilen);
> > +		if (cli_fd < 0) {
> > +			pr_perror("Unable to accept get image connection");
> > +			return NULL;
> > +		}
> > +
> > +		if (pthread_create(
> > +		    &tid, NULL, get_func, (void*) cli_fd)) {
> > +			pr_perror("Unable to create put thread");
> > +			return NULL;
> > +		}
> > +
> > +		add_worker(tid);
> > +	}
> > +}
> > +
> > +void* accept_put_image_connections(void* port)
> > +{
> > +	socklen_t clilen;
> > +	int cli_fd;
> > +	pthread_t tid;
> > +	int put_fd = *((int*) port);
> > +	struct sockaddr_in cli_addr;
> > +	clilen = sizeof(cli_addr);
> > +	char path_buf[PATHLEN];
> > +	char namespace_buf[PATHLEN];
> > +
> > +	while (1) {
> > +
> > +		cli_fd = accept(put_fd, (struct sockaddr *) &cli_addr, &clilen);
> > +		if (cli_fd < 0) {
> > +			pr_perror("Unable to accept put image connection");
> > +			return NULL;
> > +		}
> > +
> > +		if (read_header(cli_fd, namespace_buf, path_buf) < 0) {
> > +		    pr_perror("Error reading header");
> > +		    continue;
> > +		}
> > +
> > +		remote_image* rimg = get_rimg_by_name(namespace_buf, path_buf);
> > +
> > +		pr_info("Reveiced PUT request for %s:%s\n", path_buf, namespace_buf);
> > +
> > +		if (rimg == NULL) {
> > +			rimg = malloc(sizeof (remote_image));
> > +			if (rimg == NULL) {
> > +				pr_perror("Unable to allocate remote_image structures");
> > +				return NULL;
> > +			}
> > +
> > +			remote_buffer* buf = malloc(sizeof (remote_buffer));
> > +			if (buf == NULL) {
> > +				pr_perror("Unable to allocate remote_buffer structures");
> > +				return NULL;
> > +			}
> > +
> > +			strncpy(rimg->path, path_buf, PATHLEN);
> > +			strncpy(rimg->namespace, namespace_buf, PATHLEN);
> > +			buf->nbytes = 0;
> > +			INIT_LIST_HEAD(&(rimg->buf_head));
> > +			list_add_tail(&(buf->l), &(rimg->buf_head));
> > +		}
> > +		/* NOTE: we implement a PUT by clearing the previous file. */
> > +		else {
> > +		    pr_info("Clearing previous images for %s:%s\n",
> > +			    path_buf, namespace_buf);
> > +			pthread_mutex_lock(&rimg_lock);
> > +			list_del(&(rimg->l));
> > +			pthread_mutex_unlock(&rimg_lock);
> > +			while (!list_is_singular(&(rimg->buf_head))) {
> > +				list_del(rimg->buf_head.prev);
> > +			}
> > +			list_entry(rimg->buf_head.next, remote_buffer, l)->nbytes = 0;
> > +		}
> > +		rimg->src_fd = cli_fd;
> > +		rimg->dst_fd = -1;
> > +
> > +		if (pthread_create(
> > +		    &tid, NULL, put_func, (void*) rimg)) {
> > +			pr_perror("Unable to create put thread");
> > +			return NULL;
> > +		}
> > +
> > +		pr_info("Serving PUT request for %s:%s (tid=%lu)\n",
> > +			rimg->path, rimg->namespace, (unsigned long) tid);
> > +
> > +		add_worker(tid);
> > +
> > +		if (!strncmp(path_buf, DUMP_FINISH, sizeof (DUMP_FINISH))) {
> > +			finished = 1;
> > +			pr_info("Received DUMP FINISH\n");
> > +			sem_post(&rimg_semph);
> > +		}
> > +	}
> > +}
> > +
> > +int recv_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check)
> > +{
> > +	remote_buffer* curr_buf = list_entry(rbuff_head->next, remote_buffer, l);
> > +	int n, nblocks = 0;
> > +	uint64_t image_bytes = 0;
> > +	uint64_t recv_bytes = 0;
> > +
> > +	if (image_check && (read(fd, &image_bytes, sizeof(uint64_t)) != sizeof(uint64_t))) {
> > +		pr_perror("Unable to read size of the image %s", path);
> > +		close(fd);
> > +		return -1;
> > +	}
> > +
> > +	while(1) {
> > +		n = read(fd,
> > +			 curr_buf->buffer + curr_buf->nbytes,
> > +			 BUF_SIZE - curr_buf->nbytes);
> > +		if (n == 0 && image_check && image_bytes != recv_bytes) {
> > +			pr_info("Failed receiving %s (received %lu instead of %lu)\n",
> > +			    path, recv_bytes, image_bytes);
> > +			close(fd);
> > +			return -1;
> > +		}
> > +		else if (n == 0) {
> > +			pr_info("Finished receiving %s (%d full blocks, %d bytes on last block)\n",
> > +			    path, nblocks, curr_buf->nbytes);
> > +			close(fd);
> > +			return recv_bytes;
> > +		}
> > +		else if (n > 0) {
> > +			curr_buf->nbytes += n;
> > +			recv_bytes = nblocks*BUF_SIZE + curr_buf->nbytes;
> > +			if(curr_buf->nbytes == BUF_SIZE) {
> > +				remote_buffer* buf = malloc(sizeof(remote_buffer));
> > +				if (buf == NULL) {
> > +					pr_perror("Unable to allocate remote_buffer structures");
> > +					close(fd);
> > +					return -1;
> > +				}
> > +				buf->nbytes = 0;
> > +				list_add_tail(&(buf->l), rbuff_head);
> > +				curr_buf = buf;
> > +				nblocks++;
> > +			}
> > +		}
> > +		else {
> > +			pr_perror("Read on %s socket failed", path);
> > +			close(fd);
> > +			return -1;
> > +		}
> > +	}
> > +}
> > +
> > +int send_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check)
> > +{
> > +	remote_buffer* curr_buf = list_entry(rbuff_head->next, remote_buffer, l);
> > +	int n, curr_offset, nblocks;
> > +
> > +	nblocks = 0;
> > +	curr_offset = 0;
> > +
> > +	if (image_check) {
> > +		uint64_t image_bytes = sizeof_remote_buffer(rbuff_head);
> > +		n = send(fd, &image_bytes, sizeof(uint64_t), MSG_NOSIGNAL);
> > +		if (n < sizeof(uint64_t)) {
> > +			pr_perror("Unable to send size of the image %s", path);
> > +			close(fd);
> > +			return -1;
> > +		}
> > +	}
> > +
> > +	while(1) {
> > +		n = send(
> > +		    fd,
> > +		    curr_buf->buffer + curr_offset,
> > +		    MIN(BUF_SIZE, curr_buf->nbytes) - curr_offset,
> > +		    MSG_NOSIGNAL);
> > +		if (n > -1) {
> > +			curr_offset += n;
> > +			if (curr_offset == BUF_SIZE) {
> > +				curr_buf =
> > +				    list_entry(curr_buf->l.next, remote_buffer, l);
> > +				nblocks++;
> > +				curr_offset = 0;
> > +			}
> > +			else if (curr_offset == curr_buf->nbytes) {
> > +				pr_info("Finished forwarding %s (%d full blocks, %d bytes on last block)\n",
> > +					path, nblocks, curr_offset);
> > +				close(fd);
> > +			       return nblocks*BUF_SIZE + curr_buf->nbytes;
> > +			}
> > +		}
> > +		else if (errno == EPIPE || errno == ECONNRESET) {
> > +			pr_warn("Connection for %s was closed early than expected\n",
> > +				path);
> > +			return 0;
> > +		}
> > +		else {
> > +			pr_perror("Write on %s socket failed", path);
> > +			return -1;
> > +		}
> > +	}
> > +}
> > diff --git a/include/image-remote-pvt.h b/include/image-remote-pvt.h
> > new file mode 100644
> > index 0000000..69e06e1
> > --- /dev/null
> > +++ b/include/image-remote-pvt.h
> > @@ -0,0 +1,53 @@
> > +#ifndef IMAGE_REMOTE_PVT_H
> > +#define	IMAGE_REMOTE_PVT_H
> > +
> > +#include <stdbool.h>
> > +#include "list.h"
> > +#include "image-remote.h"
> > +
> > +#define DEFAULT_LISTEN 50
> > +#define PAGESIZE 4096
> > +#define BUF_SIZE PAGESIZE
> > +
> > +/*
> > + * This header is used by both the image-proxy and the image-cache.
> > + */
> > +
> > +typedef struct rbuf {
> > +    char buffer[BUF_SIZE];
> > +    int nbytes; /* How many bytes are in the buffer. */
> > +    struct list_head l;
> > +} remote_buffer;
> > +
> > +typedef struct rimg {
> > +    char path[PATHLEN];
> > +    char namespace[PATHLEN];
> > +    int src_fd;
> > +    int dst_fd;
> > +    struct list_head l;
> > +    struct list_head buf_head;
> > +
> > +} remote_image;
> > +
> > +int init_cache();
> > +int init_proxy();
> > +
> > +void join_workers();
> > +
> > +void prepare_put_rimg();
> > +void finalize_put_rimg(remote_image* rimg);
> > +
> > +void* accept_get_image_connections(void* port);
> > +void* accept_put_image_connections(void* port);
> > +
> > +void* cache_remote_image(void* rimg);
> > +void* proxy_remote_image(void* rimg);
> > +
> > +int send_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check);
> > +int recv_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check);
> > +
> > +int prepare_server_socket(int port);
> > +int prepare_client_socket(char* server, int port);
> > +
> > +#endif	/* IMAGE_REMOTE_INTERNAL_H */
> > +
> > --
> > 1.9.1
> > 
> > 
> > 
> 


-- 
Rodrigo Bruno <rbruno at gsd.inesc-id.pt>


More information about the CRIU mailing list