[CRIU] Process Migration using Sockets v2 - Patch 2/2

Rodrigo Bruno rbruno at gsd.inesc-id.pt
Mon Oct 12 04:11:01 PDT 2015


On Mon, 12 Oct 2015 12:38:43 +0300
Pavel Emelyanov <xemul at parallels.com> wrote:

> On 10/09/2015 05:46 PM, Rodrigo Bruno wrote:
> > Hi,
> > 
> > here goes the second part.
> 
> Thanks, this time it's more clear. Please, find my comments inline.
> 
> > This patch adds:
> > 
> > 1. image-cache.c - the component that is on the same node as CRIU restore. It holds 
> > image files in memory waiting for CRIU restore to ask them
> > 
> > 2. image-proxy.c - the component that is on the same node as CRIU dump. It holds
> > image files in memory (because a future dump/predump might need them) and also
> > forwards them to the image-cache.
> > 
> > 3. image-remote-pvt.c - implements almost all the image receiving and sending
> > engine.
> 
> What does the "pvt" stand for?

Private. Maybe "prvt" would be better. The idead is that existing CRIU code is
transparent to this code since this is only used by the proxy, cache, and 
image-remote.

> 
> > 4. include/image-remote-pvt.h - exports function declarations used by both image-cache
> > image-proxy and image-remote.
> > 
> > 
> > Signed-off-by: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
> >>From e690070808e6f1b459f033e162c910bc2d02859a Mon Sep 17 00:00:00 2001
> > From: rodrigo-bruno <rbruno at gsd.inesc-id.pt>
> > Date: Fri, 9 Oct 2015 15:37:17 +0100
> > Subject: [PATCH] Process migration using sockets (2/2)
> > 
> > ---
> >  image-cache.c              |  54 +++++
> >  image-proxy.c              |  74 ++++++
> >  image-remote-pvt.c         | 566 +++++++++++++++++++++++++++++++++++++++++++++
> >  include/image-remote-pvt.h |  52 +++++
> >  4 files changed, 746 insertions(+)
> >  create mode 100644 image-cache.c
> >  create mode 100644 image-proxy.c
> >  create mode 100644 image-remote-pvt.c
> >  create mode 100644 include/image-remote-pvt.h
> > 
> > diff --git a/image-cache.c b/image-cache.c
> > new file mode 100644
> > index 0000000..e2030f4
> > --- /dev/null
> > +++ b/image-cache.c
> > @@ -0,0 +1,54 @@
> > +#include <unistd.h>
> > +
> > +#include "image-remote.h"
> > +#include "image-remote-pvt.h"
> > +#include "criu-log.h"
> > +#include <pthread.h>
> > +
> > +static void* cache_remote_image(void* ptr)
> > +{
> > +	remote_image* rimg = (remote_image*) ptr;
> > +
> > +	if (!strncmp(rimg->path, DUMP_FINISH, sizeof (DUMP_FINISH)))
> > +	{
> > +		close(rimg->src_fd);
> > +		return NULL;
> > +	}
> > +
> > +	prepare_put_rimg();
> > +	recv_remote_image(rimg->src_fd, rimg->path, &rimg->buf_head, true);
> > +	finalize_put_rimg(rimg);
> > +
> > +	return NULL;
> > +}
> > +
> > +int image_cache(unsigned short cache_put_port)
> > +{
> > +	pthread_t get_thr, put_thr;
> > +	int put_fd, get_fd;
> > +
> > +	pr_info("Put Port %d, Get Path %s\n", cache_put_port, CACHE_GET);
> > +
> > +	put_fd = setup_TCP_server_socket(cache_put_port);
> 
> Proxy and cache should be able to work over given connections. Look at the
> opts.ps_socket field for details how this works in case of page-server.

Okey, this is related to the previous comment on multiple connections. I will
have a look at it and try to mimic into the proxy-cache communication.

> 
> > +	get_fd = setup_UNIX_server_socket(CACHE_GET);
> > +
> > +	if (init_daemon(cache_remote_image))
> > +		return -1;
> > +
> > +	if (pthread_create(
> > +	    &put_thr, NULL, accept_put_image_connections, (void*) &put_fd)) {
> > +		pr_perror("Unable to create put thread");
> > +		return -1;
> > +	}
> > +	if (pthread_create(
> > +	    &get_thr, NULL, accept_get_image_connections, (void*) &get_fd)) {
> > +		pr_perror("Unable to create get thread");
> > +		return -1;
> > +	}
> 
> Can you describe the logic behind the multithreading and sems/mutexes you use?

I use two sems and two mutexes, one sem and one mutex to work with worker threads list,
and the other mutex and sem to work with the remote image list.

Worker threads are created to serve specific request, either read or write an image.
When the thread finishes reading or writing the image, it exists. There are a couple
of auxiliary threads: one for receiving read connections, one for receiving write
connections, and one for joining other threads.

Mutexes are used to make sure that no one is changing or searching the list at the
same time. Looking at the code now I realize that I do not allow multiple searches
in parallel. This can be solved by adding an additional mutex (reader mutex).

Semaphores are used to indicate that a new item was added to the list or, in a 
special condition for the remote image list, to indicate that no more images will
come.

> 
> > +
> > +	join_workers();
> > +
> > +	pthread_join(put_thr, NULL);
> > +	pthread_join(get_thr, NULL);
> > +	return 0;
> > +}
> > diff --git a/image-proxy.c b/image-proxy.c
> > new file mode 100644
> > index 0000000..24b8935
> > --- /dev/null
> > +++ b/image-proxy.c
> > @@ -0,0 +1,74 @@
> > +#include <unistd.h>
> > +
> > +#include "image-remote.h"
> > +#include "image-remote-pvt.h"
> > +#include "criu-log.h"
> > +#include <pthread.h>
> > +
> > +static char* dst_host;
> > +static unsigned short dst_port;
> > +
> > +static void* proxy_remote_image(void* ptr)
> > +{
> > +	remote_image* rimg = (remote_image*) ptr;
> > +	rimg->dst_fd = setup_TCP_client_socket(dst_host, dst_port);
> > +	if (rimg->dst_fd < 0) {
> > +		pr_perror("Unable to open recover image socket");
> > +		return NULL;
> > +	}
> > +
> > +	if (write_header(rimg->dst_fd, rimg->namespace, rimg->path) < 0) {
> > +		pr_perror("Error writing header for %s:%s",
> > +			rimg->path, rimg->namespace);
> > +		return NULL;
> > +	}
> > +
> > +	prepare_put_rimg();
> > +
> > +	if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
> 
> For control messages I would suggest integer field, rather than magic names
> in paths :)

Right.

> 
> > +		close(rimg->dst_fd);
> > +		finalize_put_rimg(rimg);
> > +		return NULL;
> > +	}
> > +	if (recv_remote_image(rimg->src_fd, rimg->path, &(rimg->buf_head), false) < 0) {
> > +		return NULL;
> > +	}
> > +	finalize_put_rimg(rimg);
> > +	send_remote_image(rimg->dst_fd, rimg->path, &(rimg->buf_head), true);
> > +	return NULL;
> > +}
> > +
> > +int image_proxy(char* fwd_host, unsigned short fwd_port)
> > +{
> > +	pthread_t get_thr, put_thr;
> > +	int put_fd, get_fd;
> > +
> > +	dst_host = fwd_host;
> > +	dst_port = fwd_port;
> > +
> > +	pr_info("Proxy Get Path %s, Put Path %s, Destination Host %s:%hu\n",
> > +		PROXY_GET, PROXY_PUT, fwd_host, fwd_port);
> > +
> > +	put_fd = setup_UNIX_server_socket(PROXY_PUT);
> > +	get_fd = setup_UNIX_server_socket(PROXY_GET);
> > +
> > +	if(init_daemon(proxy_remote_image))
> > +		return -1;
> > +
> > +	if (pthread_create(
> > +	    &put_thr, NULL, accept_put_image_connections, (void*) &put_fd)) {
> > +		pr_perror("Unable to create put thread");
> > +		return -1;
> > +	}
> > +	if (pthread_create(
> > +	    &get_thr, NULL, accept_get_image_connections, (void*) &get_fd)) {
> > +		pr_perror("Unable to create get thread");
> > +		return -1;
> > +	}
> > +
> > +	join_workers();
> > +
> > +	pthread_join(put_thr, NULL);
> > +	pthread_join(get_thr, NULL);
> > +	return 0;
> > +}
> > diff --git a/image-remote-pvt.c b/image-remote-pvt.c
> > new file mode 100644
> > index 0000000..285bb91
> > --- /dev/null
> > +++ b/image-remote-pvt.c
> > @@ -0,0 +1,566 @@
> > +#include <unistd.h>
> > +#include <stdlib.h>
> > +
> > +#include <semaphore.h>
> > +#include <sys/socket.h>
> > +#include <netinet/in.h>
> > +#include <netdb.h>
> > +#include "sys/un.h"
> > +#include <pthread.h>
> > +
> > +#include "image-remote-pvt.h"
> > +#include "criu-log.h"
> > +
> > +#include "protobuf.h"
> > +#include "protobuf/remote-image.pb-c.h"
> > +#include "image.h"
> > +
> > +typedef struct wthread {
> > +	pthread_t tid;
> > +	struct list_head l;
> > +} worker_thread;
> > +
> > +static LIST_HEAD(rimg_head);
> > +static pthread_mutex_t rimg_lock;
> > +static sem_t rimg_semph;
> > +
> > +static LIST_HEAD(workers_head);
> > +static pthread_mutex_t workers_lock;
> > +static sem_t workers_semph;
> > +
> > +static int finished = 0;
> > +static int putting = 0;
> > +
> > +static void* (*get_func)(void*);
> > +static void* (*put_func)(void*);
> 
> What does get and put stand for here?

Get means read, put means write. I will refactor these two to read and
write to simplify.

> 
> > +
> > +static remote_image* get_rimg_by_name(const char* namespace, const char* path)
> > +{
> > +	remote_image* rimg = NULL;
> > +	pthread_mutex_lock(&rimg_lock);
> > +	list_for_each_entry(rimg, &rimg_head, l) {
> > +		if( !strncmp(rimg->path, path, PATHLEN) &&
> > +		    !strncmp(rimg->namespace, namespace, PATHLEN)) {
> > +			pthread_mutex_unlock(&rimg_lock);
> > +			return rimg;
> > +		}
> > +	}
> > +	pthread_mutex_unlock(&rimg_lock);
> > +	return NULL;
> > +}
> > +
> > +static uint64_t sizeof_remote_buffer(struct list_head* rbuff_head)
> > +{
> > +    uint64_t res = 0;
> > +    remote_buffer* aux = NULL;
> > +    list_for_each_entry(aux, rbuff_head, l) {
> > +	res += aux->nbytes;
> > +    }
> 
> Indentation.

Okey.

> 
> > +    return res;
> > +}
> > +
> > +int init_sync_structures()
> > +{
> > +	if (pthread_mutex_init(&rimg_lock, NULL) != 0) {
> > +		pr_perror("Remote image connection mutex init failed");
> > +		return -1;
> > +	}
> > +
> > +	if (sem_init(&rimg_semph, 0, 0) != 0) {
> > +		pr_perror("Remote image connection semaphore init failed");
> > +		return -1;
> > +	}
> > +
> > +	if (pthread_mutex_init(&workers_lock, NULL) != 0) {
> > +		pr_perror("Workers mutex init failed");
> > +		return -1;
> > +	}
> > +
> > +	if (sem_init(&workers_semph, 0, 0) != 0) {
> > +		pr_perror("Workers semaphore init failed");
> > +		return -1;
> > +	}
> > +	return 0;
> > +}
> > +
> > +static remote_image* wait_for_image(int cli_fd, char* namespace, char* path)
> > +{
> 
> This routine waits for image to appear on proxy-cache connection, doesn't it? It looks
> like the protocol between cache and proxy is more complex than just proxy sends header
> and data over the wire, can you describe it in details?

This function is used when someone asks for an image that is still not buffered.
For example, CRIU restore might ask for images that are still on the way. Another
possible scenario is for CRIU restore to ask for images that will never exist for
a particular process dump.

If such scenarios occur, the image cache will hold the thread dealing with the
particular request until: i) a new image is added to the list, ii) the flag 
finished is set to true and the flag putting (aka receiving remote image) is
false.

I found this very useful. When I migrate processes, I start the cache and CRIU
restore before starting the CRIU dump/predump.

> 
> > +	remote_image *result;
> > +
> > +	while (1) {
> > +		result = get_rimg_by_name(namespace, path);
> > +
> > +		if (result != NULL) {
> > +			if (write_header(cli_fd, namespace, path) < 0) {
> > +				pr_perror("Error writing header for %s:%s",
> > +					path, namespace);
> > +				close(cli_fd);
> > +				return NULL;
> > +			}
> > +			return result;
> > +		}
> > +		/* The file does not exist and we do not expect new files */
> > +		if (finished && !putting) {
> > +			if (write_header(cli_fd, NULL_NAMESPACE, DUMP_FINISH) < 0) {
> > +				pr_perror("Error writing header for %s:%s",
> > +					DUMP_FINISH, NULL_NAMESPACE);
> > +			}
> > +			close(cli_fd);
> > +			return NULL;
> > +		}
> > +		/* The file does not exist but the request is for a parent file.
> > +		   A parent file may not exist for the first process. */
> > +		if (!putting && !strncmp(path, PARENT_IMG, PATHLEN)) {
> > +		    if (write_header(cli_fd, namespace, path) < 0) {
> > +			    pr_perror("Error writing header for %s:%s",
> > +					path, namespace);
> > +		    }
> > +		    close(cli_fd);
> > +		    return NULL;
> > +		}
> > +		sem_wait(&rimg_semph);
> > +	}
> > +}
> > +
> > +void* get_remote_image(void* fd)
> 
> Should be static.

Right.

> 
> > +{
> > +	int cli_fd = (long) fd;
> > +	remote_image* rimg = NULL;
> > +	char path_buf[PATHLEN];
> > +	char namespace_buf[PATHLEN];
> > +
> > +	if(read_header(cli_fd, namespace_buf, path_buf) < 0) {
> > +		pr_perror("Error reading header");
> > +		return NULL;
> > +	}
> > +
> > +	pr_info("Received GET for %s:%s.\n", path_buf, namespace_buf);
> > +
> > +	rimg = wait_for_image(cli_fd, namespace_buf, path_buf);
> > +	if (!rimg)
> > +		return NULL;
> > +
> > +	rimg->dst_fd = cli_fd;
> > +	send_remote_image(rimg->dst_fd, rimg->path, &rimg->buf_head, false);
> > +	return NULL;
> > +}
> > +
> > +void prepare_put_rimg()
> > +{
> > +	pthread_mutex_lock(&rimg_lock);
> > +	putting++;
> > +	pthread_mutex_unlock(&rimg_lock);
> > +}
> > +
> > +void finalize_put_rimg(remote_image* rimg)
> > +{
> > +	pthread_mutex_lock(&rimg_lock);
> > +	list_add_tail(&(rimg->l), &rimg_head);
> > +	putting--;
> > +	pthread_mutex_unlock(&rimg_lock);
> > +	sem_post(&rimg_semph);
> > +}
> > +
> > +int init_daemon(void *(*put_function) (void *))
> > +{
> > +	get_func = get_remote_image;
> > +	put_func = put_function;
> > +	return init_sync_structures();
> > +}
> > +
> > +int setup_TCP_server_socket(int port)
> > +{
> > +	struct sockaddr_in serv_addr;
> > +	int sockopt = 1;
> > +
> > +	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
> > +	if (sockfd < 0) {
> > +		pr_perror("Unable to open image socket");
> > +		return -1;
> > +	}
> > +
> > +	bzero((char *) &serv_addr, sizeof (serv_addr));
> > +	serv_addr.sin_family = AF_INET;
> > +	serv_addr.sin_addr.s_addr = INADDR_ANY;
> > +	serv_addr.sin_port = htons(port);
> > +
> > +	if (setsockopt(
> > +	    sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof (sockopt)) == -1) {
> > +		pr_perror("Unable to set SO_REUSEADDR");
> > +		return -1;
> > +	}
> > +
> > +	if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) < 0) {
> > +		pr_perror("Unable to bind image socket");
> > +		return -1;
> > +	}
> > +
> > +	if (listen(sockfd, DEFAULT_LISTEN)) {
> > +		pr_perror("Unable to listen image socket");
> > +		return -1;
> > +	}
> > +
> > +	return sockfd;
> > +}
> > +
> > +int setup_TCP_client_socket(char* hostname, int port)
> > +{
> > +	int sockfd;
> > +	struct sockaddr_in serv_addr;
> > +	struct hostent *server;
> > +
> > +	sockfd = socket(AF_INET, SOCK_STREAM, 0);
> > +	if (sockfd < 0) {
> > +		pr_perror("Unable to open remote image socket");
> > +		return -1;
> > +	}
> > +
> > +	server = gethostbyname(hostname);
> > +	if (server == NULL) {
> > +		pr_perror("Unable to get host by name (%s)", hostname);
> > +		return -1;
> > +	}
> > +
> > +	bzero((char *) &serv_addr, sizeof (serv_addr));
> > +	serv_addr.sin_family = AF_INET;
> > +	bcopy((char *) server->h_addr,
> > +	      (char *) &serv_addr.sin_addr.s_addr,
> > +	      server->h_length);
> > +	serv_addr.sin_port = htons(port);
> > +
> > +	if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
> > +		pr_perror("Unable to connect to remote restore host %s", hostname);
> > +		return -1;
> > +	}
> > +
> > +	return sockfd;
> > +}
> > +
> > +int setup_UNIX_server_socket(char* path)
> > +{
> > +	struct sockaddr_un addr;
> > +	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> > +	if(sockfd < 0) {
> > +		pr_perror("Unable to open image socket");
> > +		return -1;
> > +	}
> > +
> > +	memset(&addr, 0, sizeof(addr));
> > +	addr.sun_family = AF_UNIX;
> > +	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> > +
> > +	unlink(path);
> > +
> > +	if (bind(sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
> > +		pr_perror("Unable to bind image socket");
> > +		return -1;
> > +	}
> > +
> > +	if (listen(sockfd, 50) == -1) {
> > +		pr_perror("Unable to listen image socket");
> > +		return -1;
> > +	}
> > +
> > +	return sockfd;
> > +}
> > +
> > +int setup_UNIX_client_socket(char* path)
> > +{
> > +	struct sockaddr_un addr;
> > +	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> > +	if (sockfd < 0) {
> > +		pr_perror("Unable to open local image socket");
> > +		return -1;
> > +	}
> > +
> > +	memset(&addr, 0, sizeof(addr));
> > +	addr.sun_family = AF_UNIX;
> > +	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> > +
> > +	if (connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
> > +		pr_perror("Unable to connect to local socket: %s", path);
> > +		return -1;
> > +	}
> > +
> > +	return sockfd;
> > +}
> > +
> > +int pb_write_obj(int fd, void* obj, int type)
> 
> These pb_* helpers are not used outside the .c file, so should be static
> and removed from headers.

These two helpers are also used inside image-remote.c to send namespaces.

> 
> > +{
> > +	struct cr_img img;
> > +	img._x.fd = fd;
> > +	bfd_setraw(&img._x);
> > +	return pb_write_one(&img, obj, type);
> > +}
> > +
> > +int pb_read_obj(int fd, void** pobj, int type)
> > +{
> > +	struct cr_img img;
> > +	img._x.fd = fd;
> > +	bfd_setraw(&img._x);
> > +	return do_pb_read_one(&img, pobj, type, true);
> > +}
> > +
> > +int write_header(int fd, char* namespace, char* path)
> > +{
> > +	RemoteImageEntry ri = REMOTE_IMAGE_ENTRY__INIT;
> > +	ri.name = path;
> > +	ri.namespace_ = namespace;
> > +	return pb_write_obj(fd, &ri, PB_REMOTE_IMAGE);
> > +}
> > +
> > +int read_header(int fd, char* namespace, char* path)
> > +{
> > +	RemoteImageEntry* ri;
> > +	int ret = pb_read_obj(fd, (void**)&ri, PB_REMOTE_IMAGE);
> > +	if (ret) {
> > +		strncpy(namespace, ri->namespace_, PATHLEN);
> > +		strncpy(path, ri->name, PATHLEN);
> > +	}
> > +	free(ri);
> > +	return ret;
> > +}
> > +
> > +static void add_worker(pthread_t tid)
> > +{
> > +	worker_thread* wthread = malloc(sizeof(worker_thread));
> > +	if(!wthread) {
> > +		pr_perror("Unable to allocate worker thread structure");
> > +	}
> > +	wthread->tid = tid;
> > +	pthread_mutex_lock(&workers_lock);
> > +	list_add_tail(&(wthread->l), &workers_head);
> > +	pthread_mutex_unlock(&workers_lock);
> > +	sem_post(&workers_semph);
> > +}
> > +
> > +void join_workers()
> > +{
> > +	worker_thread* wthread = NULL;
> > +	while (1) {
> > +	    if (list_empty(&workers_head)) {
> > +		    sem_wait(&workers_semph);
> > +		    continue;
> > +	    }
> > +	    wthread = list_entry(workers_head.next, worker_thread, l);
> > +	    if (pthread_join(wthread->tid, NULL)) {
> > +		    pr_perror("Could not join thread %lu", (unsigned long) wthread->tid);
> > +	    }
> > +	    else {
> > +		    list_del(&(wthread->l));
> > +		    free(wthread);
> > +	    }
> > +
> > +	}
> > +}
> > +
> > +void* accept_get_image_connections(void* port)
> > +{
> > +	socklen_t clilen;
> > +	long cli_fd;
> > +	pthread_t tid;
> > +	int get_fd = *((int*) port);
> > +	struct sockaddr_in cli_addr;
> > +	clilen = sizeof (cli_addr);
> > +
> > +	while (1) {
> > +		cli_fd = accept(get_fd, (struct sockaddr *) &cli_addr, &clilen);
> > +		if (cli_fd < 0) {
> > +			pr_perror("Unable to accept get image connection");
> > +			return NULL;
> > +		}
> > +
> > +		if (pthread_create(
> > +		    &tid, NULL, get_func, (void*) cli_fd)) {
> > +			pr_perror("Unable to create put thread");
> > +			return NULL;
> > +		}
> > +
> > +		add_worker(tid);
> > +	}
> > +}
> > +
> > +void* accept_put_image_connections(void* port)
> > +{
> > +	socklen_t clilen;
> > +	int cli_fd;
> > +	pthread_t tid;
> > +	int put_fd = *((int*) port);
> > +	struct sockaddr_in cli_addr;
> > +	clilen = sizeof(cli_addr);
> > +	char path_buf[PATHLEN];
> > +	char namespace_buf[PATHLEN];
> > +
> > +	while (1) {
> > +
> > +		cli_fd = accept(put_fd, (struct sockaddr *) &cli_addr, &clilen);
> > +		if (cli_fd < 0) {
> > +			pr_perror("Unable to accept put image connection");
> > +			return NULL;
> > +		}
> > +
> > +		if (read_header(cli_fd, namespace_buf, path_buf) < 0) {
> > +		    pr_perror("Error reading header");
> > +		    continue;
> > +		}
> > +
> > +		remote_image* rimg = get_rimg_by_name(namespace_buf, path_buf);
> > +
> > +		pr_info("Reveiced PUT request for %s:%s\n", path_buf, namespace_buf);
> > +
> > +		if (rimg == NULL) {
> > +			rimg = malloc(sizeof (remote_image));
> > +			if (rimg == NULL) {
> > +				pr_perror("Unable to allocate remote_image structures");
> > +				return NULL;
> > +			}
> > +
> > +			remote_buffer* buf = malloc(sizeof (remote_buffer));
> > +			if (buf == NULL) {
> > +				pr_perror("Unable to allocate remote_buffer structures");
> > +				return NULL;
> > +			}
> > +
> > +			strncpy(rimg->path, path_buf, PATHLEN);
> > +			strncpy(rimg->namespace, namespace_buf, PATHLEN);
> > +			buf->nbytes = 0;
> > +			INIT_LIST_HEAD(&(rimg->buf_head));
> > +			list_add_tail(&(buf->l), &(rimg->buf_head));
> > +		}
> > +		/* NOTE: we implement a PUT by clearing the previous file. */
> > +		else {
> > +		    pr_info("Clearing previous images for %s:%s\n",
> > +			    path_buf, namespace_buf);
> > +			pthread_mutex_lock(&rimg_lock);
> > +			list_del(&(rimg->l));
> > +			pthread_mutex_unlock(&rimg_lock);
> > +			while (!list_is_singular(&(rimg->buf_head))) {
> > +				list_del(rimg->buf_head.prev);
> > +			}
> > +			list_entry(rimg->buf_head.next, remote_buffer, l)->nbytes = 0;
> > +		}
> > +		rimg->src_fd = cli_fd;
> > +		rimg->dst_fd = -1;
> > +
> > +		if (pthread_create(
> > +		    &tid, NULL, put_func, (void*) rimg)) {
> > +			pr_perror("Unable to create put thread");
> > +			return NULL;
> > +		}
> > +
> > +		pr_info("Serving PUT request for %s:%s (tid=%lu)\n",
> > +			rimg->path, rimg->namespace, (unsigned long) tid);
> > +
> > +		add_worker(tid);
> > +
> > +		if (!strncmp(path_buf, DUMP_FINISH, sizeof (DUMP_FINISH))) {
> > +			finished = 1;
> > +			pr_info("Received DUMP FINISH\n");
> > +			sem_post(&rimg_semph);
> > +		}
> > +	}
> > +}
> > +
> > +int recv_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check)
> 
> This routine makes sense only on the cache side, am I right? If yes, it would be nice
> to split the code into different .c files so that when reading the code it is clear
> which one of the cache-proxy pair we're at.

No, it is also used by the image-proxy to receive images from CRIU dump/predump.

> 
> > +{
> > +	remote_buffer* curr_buf = list_entry(rbuff_head->next, remote_buffer, l);
> > +	int n, nblocks = 0;
> > +	uint64_t image_bytes = 0;
> > +	uint64_t recv_bytes = 0;
> > +
> > +	if (image_check && (read(fd, &image_bytes, sizeof(uint64_t)) != sizeof(uint64_t))) {
> > +		pr_perror("Unable to read size of the image %s", path);
> > +		close(fd);
> > +		return -1;
> > +	}
> > +
> > +	while(1) {
> > +		n = read(fd,
> > +			 curr_buf->buffer + curr_buf->nbytes,
> > +			 BUF_SIZE - curr_buf->nbytes);
> > +		if (n == 0 && image_check && image_bytes != recv_bytes) {
> > +			pr_info("Failed receiving %s (received %lu instead of %lu)\n",
> > +			    path, recv_bytes, image_bytes);
> > +			close(fd);
> > +			return -1;
> > +		}
> > +		else if (n == 0) {
> > +			pr_info("Finished receiving %s (%d full blocks, %d bytes on last block)\n",
> > +			    path, nblocks, curr_buf->nbytes);
> > +			close(fd);
> > +			return recv_bytes;
> > +		}
> > +		else if (n > 0) {
> > +			curr_buf->nbytes += n;
> > +			recv_bytes = nblocks*BUF_SIZE + curr_buf->nbytes;
> > +			if(curr_buf->nbytes == BUF_SIZE) {
> > +				remote_buffer* buf = malloc(sizeof(remote_buffer));
> > +				if (buf == NULL) {
> > +					pr_perror("Unable to allocate remote_buffer structures");
> > +					close(fd);
> > +					return -1;
> > +				}
> > +				buf->nbytes = 0;
> > +				list_add_tail(&(buf->l), rbuff_head);
> > +				curr_buf = buf;
> > +				nblocks++;
> > +			}
> > +		}
> > +		else {
> > +			pr_perror("Read on %s socket failed", path);
> > +			close(fd);
> > +			return -1;
> > +		}
> > +	}
> > +}
> > +
> > +int send_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check)
> 
> And this one is opposite -- for proxy.

Same here. This function is used inside proxy to forward images but it is also 
used to send images to CRIU restore (from the cache).

> 
> > +{
> > +	remote_buffer* curr_buf = list_entry(rbuff_head->next, remote_buffer, l);
> > +	int n, curr_offset, nblocks;
> > +
> > +	nblocks = 0;
> > +	curr_offset = 0;
> > +
> > +	if (image_check) {
> > +		uint64_t image_bytes = sizeof_remote_buffer(rbuff_head);
> > +		n = send(fd, &image_bytes, sizeof(uint64_t), MSG_NOSIGNAL);
> > +		if (n < sizeof(uint64_t)) {
> > +			pr_perror("Unable to send size of the image %s", path);
> > +			close(fd);
> > +			return -1;
> > +		}
> > +	}
> > +
> > +	while(1) {
> > +		n = send(
> > +		    fd,
> > +		    curr_buf->buffer + curr_offset,
> > +		    MIN(BUF_SIZE, curr_buf->nbytes) - curr_offset,
> > +		    MSG_NOSIGNAL);
> > +		if (n > -1) {
> > +			curr_offset += n;
> > +			if (curr_offset == BUF_SIZE) {
> > +				curr_buf =
> > +				    list_entry(curr_buf->l.next, remote_buffer, l);
> > +				nblocks++;
> > +				curr_offset = 0;
> > +			}
> > +			else if (curr_offset == curr_buf->nbytes) {
> > +				pr_info("Finished forwarding %s (%d full blocks, %d bytes on last block)\n",
> > +					path, nblocks, curr_offset);
> > +				close(fd);
> > +			       return nblocks*BUF_SIZE + curr_buf->nbytes;
> > +			}
> > +		}
> > +		else if (errno == EPIPE || errno == ECONNRESET) {
> > +			pr_warn("Connection for %s was closed early than expected\n",
> > +				path);
> > +			return 0;
> > +		}
> > +		else {
> > +			pr_perror("Write on %s socket failed", path);
> > +			return -1;
> > +		}
> > +	}
> > diff --git a/include/image-remote-pvt.h b/include/image-remote-pvt.h
> > new file mode 100644
> > index 0000000..0a20e6d
> > --- /dev/null
> > +++ b/include/image-remote-pvt.h
> > @@ -0,0 +1,52 @@
> > +#ifndef IMAGE_REMOTE_PVT_H
> > +#define	IMAGE_REMOTE_PVT_H
> > +
> > +#include <stdbool.h>
> > +#include "list.h"
> > +#include "image-remote.h"
> > +
> > +#define DEFAULT_LISTEN 50
> > +#define PAGESIZE 4096
> > +#define BUF_SIZE PAGESIZE
> > +
> > +typedef struct rbuf {
> > +	char buffer[BUF_SIZE];
> > +	int nbytes; /* How many bytes are in the buffer. */
> > +	struct list_head l;
> > +} remote_buffer;
> 
> Minor, but still. I'd prefer plain structures as datatypes for objects that
> are used internally in some subsystem.

Sorry, I couldn't get the idea...

> 
> > +
> > +typedef struct rimg {
> > +	char path[PATHLEN];
> > +	char namespace[PATHLEN];
> > +	int src_fd;
> > +	int dst_fd;
> > +	struct list_head l;
> > +	struct list_head buf_head;
> > +
> > +} remote_image;
> > +
> > +int init_daemon(void *(*put_function) (void *));
> > +
> > +void join_workers();
> > +
> > +void prepare_put_rimg();
> > +void finalize_put_rimg(remote_image* rimg);
> > +
> > +void* accept_get_image_connections(void* port);
> > +void* accept_put_image_connections(void* port);
> > +
> > +int send_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check);
> > +int recv_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check);
> > +
> > +int pb_write_obj(int fd, void* obj, int type);
> > +int pb_read_obj(int fd, void** obj, int type);
> > +
> > +int write_header(int fd, char* namespace, char* path);
> > +int read_header(int fd, char* namespace, char* path);
> > +
> > +int setup_TCP_server_socket(int port);
> > +int setup_TCP_client_socket(char* hostname, int port);
> > +int setup_UNIX_client_socket(char* path);
> > +int setup_UNIX_server_socket(char* path);
> > +#endif	/* IMAGE_REMOTE_INTERNAL_H */
> > +
> > 
> 
> -- Pavel


-- 
Rodrigo Bruno <rbruno at gsd.inesc-id.pt>


More information about the CRIU mailing list