[CRIU] Process Migration using Sockets v2 - Patch 2/2

Rodrigo Bruno rbruno at gsd.inesc-id.pt
Tue Oct 13 07:14:59 PDT 2015


On Mon, 12 Oct 2015 14:56:00 +0300
Pavel Emelyanov <xemul at parallels.com> wrote:

> On 10/12/2015 02:11 PM, Rodrigo Bruno wrote:
> > On Mon, 12 Oct 2015 12:38:43 +0300
> > Pavel Emelyanov <xemul at parallels.com> wrote:
> > 
> >> On 10/09/2015 05:46 PM, Rodrigo Bruno wrote:
> >>> Hi,
> >>>
> >>> here goes the second part.
> >>
> >> Thanks, this time it's more clear. Please, find my comments inline.
> >>
> >>> This patch adds:
> >>>
> >>> 1. image-cache.c - the component that is on the same node as CRIU restore. It holds 
> >>> image files in memory waiting for CRIU restore to ask them
> >>>
> >>> 2. image-proxy.c - the component that is on the same node as CRIU dump. It holds
> >>> image files in memory (because a future dump/predump might need them) and also
> >>> forwards them to the image-cache.
> >>>
> >>> 3. image-remote-pvt.c - implements almost all the image receiving and sending
> >>> engine.
> >>
> >> What does the "pvt" stand for?
> > 
> > Private. Maybe "prvt" would be better. The idead is that existing CRIU code is
> > transparent to this code since this is only used by the proxy, cache, and 
> > image-remote.
> 
> Ah, I see. Since all this stuff sits in one dir, then private is not applicable here :)
> I'd call those img-cache, img-proxy and img-remote.

image-proxy.c -> img-proxy.c
image-cache.c -> img-cache.c
image-remote-pvt.c -> img-remote.c
image-remote.c -> (stays the same?)

> 
> >>
> >>> 4. include/image-remote-pvt.h - exports function declarations used by both image-cache
> >>> image-proxy and image-remote.
> >>>
> >>>
> >>> Signed-off-by: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
> >>> >From e690070808e6f1b459f033e162c910bc2d02859a Mon Sep 17 00:00:00 2001
> >>> From: rodrigo-bruno <rbruno at gsd.inesc-id.pt>
> >>> Date: Fri, 9 Oct 2015 15:37:17 +0100
> >>> Subject: [PATCH] Process migration using sockets (2/2)
> >>>
> >>> ---
> >>>  image-cache.c              |  54 +++++
> >>>  image-proxy.c              |  74 ++++++
> >>>  image-remote-pvt.c         | 566 +++++++++++++++++++++++++++++++++++++++++++++
> >>>  include/image-remote-pvt.h |  52 +++++
> >>>  4 files changed, 746 insertions(+)
> >>>  create mode 100644 image-cache.c
> >>>  create mode 100644 image-proxy.c
> >>>  create mode 100644 image-remote-pvt.c
> >>>  create mode 100644 include/image-remote-pvt.h
> >>>
> >>> diff --git a/image-cache.c b/image-cache.c
> >>> new file mode 100644
> >>> index 0000000..e2030f4
> >>> --- /dev/null
> >>> +++ b/image-cache.c
> >>> @@ -0,0 +1,54 @@
> >>> +#include <unistd.h>
> >>> +
> >>> +#include "image-remote.h"
> >>> +#include "image-remote-pvt.h"
> >>> +#include "criu-log.h"
> >>> +#include <pthread.h>
> >>> +
> >>> +static void* cache_remote_image(void* ptr)
> >>> +{
> >>> +	remote_image* rimg = (remote_image*) ptr;
> >>> +
> >>> +	if (!strncmp(rimg->path, DUMP_FINISH, sizeof (DUMP_FINISH)))
> >>> +	{
> >>> +		close(rimg->src_fd);
> >>> +		return NULL;
> >>> +	}
> >>> +
> >>> +	prepare_put_rimg();
> >>> +	recv_remote_image(rimg->src_fd, rimg->path, &rimg->buf_head, true);
> >>> +	finalize_put_rimg(rimg);
> >>> +
> >>> +	return NULL;
> >>> +}
> >>> +
> >>> +int image_cache(unsigned short cache_put_port)
> >>> +{
> >>> +	pthread_t get_thr, put_thr;
> >>> +	int put_fd, get_fd;
> >>> +
> >>> +	pr_info("Put Port %d, Get Path %s\n", cache_put_port, CACHE_GET);
> >>> +
> >>> +	put_fd = setup_TCP_server_socket(cache_put_port);
> >>
> >> Proxy and cache should be able to work over given connections. Look at the
> >> opts.ps_socket field for details how this works in case of page-server.
> > 
> > Okey, this is related to the previous comment on multiple connections. I will
> > have a look at it and try to mimic into the proxy-cache communication.
> 
> OK, thanks.
> 
> >>
> >>> +	get_fd = setup_UNIX_server_socket(CACHE_GET);
> >>> +
> >>> +	if (init_daemon(cache_remote_image))
> >>> +		return -1;
> >>> +
> >>> +	if (pthread_create(
> >>> +	    &put_thr, NULL, accept_put_image_connections, (void*) &put_fd)) {
> >>> +		pr_perror("Unable to create put thread");
> >>> +		return -1;
> >>> +	}
> >>> +	if (pthread_create(
> >>> +	    &get_thr, NULL, accept_get_image_connections, (void*) &get_fd)) {
> >>> +		pr_perror("Unable to create get thread");
> >>> +		return -1;
> >>> +	}
> >>
> >> Can you describe the logic behind the multithreading and sems/mutexes you use?
> > 
> > I use two sems and two mutexes, one sem and one mutex to work with worker threads list,
> > and the other mutex and sem to work with the remote image list.
> > 
> > Worker threads are created to serve specific request, either read or write an image.
> > When the thread finishes reading or writing the image, it exists. There are a couple
> > of auxiliary threads: one for receiving read connections, one for receiving write
> > connections, and one for joining other threads.
> > 
> > Mutexes are used to make sure that no one is changing or searching the list at the
> > same time. Looking at the code now I realize that I do not allow multiple searches
> > in parallel. This can be solved by adding an additional mutex (reader mutex).
> > 
> > Semaphores are used to indicate that a new item was added to the list or, in a 
> > special condition for the remote image list, to indicate that no more images will
> > come.
> 
> OK, so the semaphores act as wake-up-s, right?

Yes.

> 
> >>
> >>> +
> >>> +	join_workers();
> >>> +
> >>> +	pthread_join(put_thr, NULL);
> >>> +	pthread_join(get_thr, NULL);
> >>> +	return 0;
> >>> +}
> >>> diff --git a/image-proxy.c b/image-proxy.c
> >>> new file mode 100644
> >>> index 0000000..24b8935
> >>> --- /dev/null
> >>> +++ b/image-proxy.c
> >>> @@ -0,0 +1,74 @@
> >>> +#include <unistd.h>
> >>> +
> >>> +#include "image-remote.h"
> >>> +#include "image-remote-pvt.h"
> >>> +#include "criu-log.h"
> >>> +#include <pthread.h>
> >>> +
> >>> +static char* dst_host;
> >>> +static unsigned short dst_port;
> >>> +
> >>> +static void* proxy_remote_image(void* ptr)
> >>> +{
> >>> +	remote_image* rimg = (remote_image*) ptr;
> >>> +	rimg->dst_fd = setup_TCP_client_socket(dst_host, dst_port);
> >>> +	if (rimg->dst_fd < 0) {
> >>> +		pr_perror("Unable to open recover image socket");
> >>> +		return NULL;
> >>> +	}
> >>> +
> >>> +	if (write_header(rimg->dst_fd, rimg->namespace, rimg->path) < 0) {
> >>> +		pr_perror("Error writing header for %s:%s",
> >>> +			rimg->path, rimg->namespace);
> >>> +		return NULL;
> >>> +	}
> >>> +
> >>> +	prepare_put_rimg();
> >>> +
> >>> +	if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
> >>
> >> For control messages I would suggest integer field, rather than magic names
> >> in paths :)
> > 
> > Right.
> > 
> >>
> >>> +		close(rimg->dst_fd);
> >>> +		finalize_put_rimg(rimg);
> >>> +		return NULL;
> >>> +	}
> >>> +	if (recv_remote_image(rimg->src_fd, rimg->path, &(rimg->buf_head), false) < 0) {
> >>> +		return NULL;
> >>> +	}
> >>> +	finalize_put_rimg(rimg);
> >>> +	send_remote_image(rimg->dst_fd, rimg->path, &(rimg->buf_head), true);
> >>> +	return NULL;
> >>> +}
> >>> +
> >>> +int image_proxy(char* fwd_host, unsigned short fwd_port)
> >>> +{
> >>> +	pthread_t get_thr, put_thr;
> >>> +	int put_fd, get_fd;
> >>> +
> >>> +	dst_host = fwd_host;
> >>> +	dst_port = fwd_port;
> >>> +
> >>> +	pr_info("Proxy Get Path %s, Put Path %s, Destination Host %s:%hu\n",
> >>> +		PROXY_GET, PROXY_PUT, fwd_host, fwd_port);
> >>> +
> >>> +	put_fd = setup_UNIX_server_socket(PROXY_PUT);
> >>> +	get_fd = setup_UNIX_server_socket(PROXY_GET);
> >>> +
> >>> +	if(init_daemon(proxy_remote_image))
> >>> +		return -1;
> >>> +
> >>> +	if (pthread_create(
> >>> +	    &put_thr, NULL, accept_put_image_connections, (void*) &put_fd)) {
> >>> +		pr_perror("Unable to create put thread");
> >>> +		return -1;
> >>> +	}
> >>> +	if (pthread_create(
> >>> +	    &get_thr, NULL, accept_get_image_connections, (void*) &get_fd)) {
> >>> +		pr_perror("Unable to create get thread");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	join_workers();
> >>> +
> >>> +	pthread_join(put_thr, NULL);
> >>> +	pthread_join(get_thr, NULL);
> >>> +	return 0;
> >>> +}
> >>> diff --git a/image-remote-pvt.c b/image-remote-pvt.c
> >>> new file mode 100644
> >>> index 0000000..285bb91
> >>> --- /dev/null
> >>> +++ b/image-remote-pvt.c
> >>> @@ -0,0 +1,566 @@
> >>> +#include <unistd.h>
> >>> +#include <stdlib.h>
> >>> +
> >>> +#include <semaphore.h>
> >>> +#include <sys/socket.h>
> >>> +#include <netinet/in.h>
> >>> +#include <netdb.h>
> >>> +#include "sys/un.h"
> >>> +#include <pthread.h>
> >>> +
> >>> +#include "image-remote-pvt.h"
> >>> +#include "criu-log.h"
> >>> +
> >>> +#include "protobuf.h"
> >>> +#include "protobuf/remote-image.pb-c.h"
> >>> +#include "image.h"
> >>> +
> >>> +typedef struct wthread {
> >>> +	pthread_t tid;
> >>> +	struct list_head l;
> >>> +} worker_thread;
> >>> +
> >>> +static LIST_HEAD(rimg_head);
> >>> +static pthread_mutex_t rimg_lock;
> >>> +static sem_t rimg_semph;
> >>> +
> >>> +static LIST_HEAD(workers_head);
> >>> +static pthread_mutex_t workers_lock;
> >>> +static sem_t workers_semph;
> >>> +
> >>> +static int finished = 0;
> >>> +static int putting = 0;
> >>> +
> >>> +static void* (*get_func)(void*);
> >>> +static void* (*put_func)(void*);
> >>
> >> What does get and put stand for here?
> > 
> > Get means read, put means write. I will refactor these two to read and
> > write to simplify.
> > 
> >>
> >>> +
> >>> +static remote_image* get_rimg_by_name(const char* namespace, const char* path)
> >>> +{
> >>> +	remote_image* rimg = NULL;
> >>> +	pthread_mutex_lock(&rimg_lock);
> >>> +	list_for_each_entry(rimg, &rimg_head, l) {
> >>> +		if( !strncmp(rimg->path, path, PATHLEN) &&
> >>> +		    !strncmp(rimg->namespace, namespace, PATHLEN)) {
> >>> +			pthread_mutex_unlock(&rimg_lock);
> >>> +			return rimg;
> >>> +		}
> >>> +	}
> >>> +	pthread_mutex_unlock(&rimg_lock);
> >>> +	return NULL;
> >>> +}
> >>> +
> >>> +static uint64_t sizeof_remote_buffer(struct list_head* rbuff_head)
> >>> +{
> >>> +    uint64_t res = 0;
> >>> +    remote_buffer* aux = NULL;
> >>> +    list_for_each_entry(aux, rbuff_head, l) {
> >>> +	res += aux->nbytes;
> >>> +    }
> >>
> >> Indentation.
> > 
> > Okey.
> > 
> >>
> >>> +    return res;
> >>> +}
> >>> +
> >>> +int init_sync_structures()
> >>> +{
> >>> +	if (pthread_mutex_init(&rimg_lock, NULL) != 0) {
> >>> +		pr_perror("Remote image connection mutex init failed");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	if (sem_init(&rimg_semph, 0, 0) != 0) {
> >>> +		pr_perror("Remote image connection semaphore init failed");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	if (pthread_mutex_init(&workers_lock, NULL) != 0) {
> >>> +		pr_perror("Workers mutex init failed");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	if (sem_init(&workers_semph, 0, 0) != 0) {
> >>> +		pr_perror("Workers semaphore init failed");
> >>> +		return -1;
> >>> +	}
> >>> +	return 0;
> >>> +}
> >>> +
> >>> +static remote_image* wait_for_image(int cli_fd, char* namespace, char* path)
> >>> +{
> >>
> >> This routine waits for image to appear on proxy-cache connection, doesn't it? It looks
> >> like the protocol between cache and proxy is more complex than just proxy sends header
> >> and data over the wire, can you describe it in details?
> > 
> > This function is used when someone asks for an image that is still not buffered.
> > For example, CRIU restore might ask for images that are still on the way. Another
> > possible scenario is for CRIU restore to ask for images that will never exist for
> > a particular process dump.
> > 
> > If such scenarios occur, the image cache will hold the thread dealing with the
> > particular request until: i) a new image is added to the list, ii) the flag 
> > finished is set to true and the flag putting (aka receiving remote image) is
> > false.
> > 
> > I found this very useful. When I migrate processes, I start the cache and CRIU
> > restore before starting the CRIU dump/predump.
> 
> OK this is understood. Can you also describe the protocol between cache and proxy,
> i.e. the one over the remote (TCP) connection?

Yes, when the proxy finishes receiving a file from CRIU dump/predump it:

1- opens a write connection to image-cache
2- write image header (name + namespace) (protobuf object)
3- write image size (uint64_t)
4- writes image
5- closes


the image-cache side:
1- accepts connection
2- reads header
3- reads size
4- reads image
5- upon reading returns 0, checks if the received bytes match the size.

This will be changed to use only one connection (inherited from the user).

It will look like this:
image-proxy:
1- write image header (name + namespace + size) protobuf
2- write image
3- write closing header (to replace the close)

image-cache:
1- read image header
2- read image
3- read image closing header (it knows when this header should came because it knows
the size of the image).

> 
> >>
> >>> +	remote_image *result;
> >>> +
> >>> +	while (1) {
> >>> +		result = get_rimg_by_name(namespace, path);
> >>> +
> >>> +		if (result != NULL) {
> >>> +			if (write_header(cli_fd, namespace, path) < 0) {
> >>> +				pr_perror("Error writing header for %s:%s",
> >>> +					path, namespace);
> >>> +				close(cli_fd);
> >>> +				return NULL;
> >>> +			}
> >>> +			return result;
> >>> +		}
> >>> +		/* The file does not exist and we do not expect new files */
> >>> +		if (finished && !putting) {
> >>> +			if (write_header(cli_fd, NULL_NAMESPACE, DUMP_FINISH) < 0) {
> >>> +				pr_perror("Error writing header for %s:%s",
> >>> +					DUMP_FINISH, NULL_NAMESPACE);
> >>> +			}
> >>> +			close(cli_fd);
> >>> +			return NULL;
> >>> +		}
> >>> +		/* The file does not exist but the request is for a parent file.
> >>> +		   A parent file may not exist for the first process. */
> >>> +		if (!putting && !strncmp(path, PARENT_IMG, PATHLEN)) {
> >>> +		    if (write_header(cli_fd, namespace, path) < 0) {
> >>> +			    pr_perror("Error writing header for %s:%s",
> >>> +					path, namespace);
> >>> +		    }
> >>> +		    close(cli_fd);
> >>> +		    return NULL;
> >>> +		}
> >>> +		sem_wait(&rimg_semph);
> >>> +	}
> >>> +}
> >>> +
> >>> +void* get_remote_image(void* fd)
> >>
> >> Should be static.
> > 
> > Right.
> > 
> >>
> >>> +{
> >>> +	int cli_fd = (long) fd;
> >>> +	remote_image* rimg = NULL;
> >>> +	char path_buf[PATHLEN];
> >>> +	char namespace_buf[PATHLEN];
> >>> +
> >>> +	if(read_header(cli_fd, namespace_buf, path_buf) < 0) {
> >>> +		pr_perror("Error reading header");
> >>> +		return NULL;
> >>> +	}
> >>> +
> >>> +	pr_info("Received GET for %s:%s.\n", path_buf, namespace_buf);
> >>> +
> >>> +	rimg = wait_for_image(cli_fd, namespace_buf, path_buf);
> >>> +	if (!rimg)
> >>> +		return NULL;
> >>> +
> >>> +	rimg->dst_fd = cli_fd;
> >>> +	send_remote_image(rimg->dst_fd, rimg->path, &rimg->buf_head, false);
> >>> +	return NULL;
> >>> +}
> >>> +
> >>> +void prepare_put_rimg()
> >>> +{
> >>> +	pthread_mutex_lock(&rimg_lock);
> >>> +	putting++;
> >>> +	pthread_mutex_unlock(&rimg_lock);
> >>> +}
> >>> +
> >>> +void finalize_put_rimg(remote_image* rimg)
> >>> +{
> >>> +	pthread_mutex_lock(&rimg_lock);
> >>> +	list_add_tail(&(rimg->l), &rimg_head);
> >>> +	putting--;
> >>> +	pthread_mutex_unlock(&rimg_lock);
> >>> +	sem_post(&rimg_semph);
> >>> +}
> >>> +
> >>> +int init_daemon(void *(*put_function) (void *))
> >>> +{
> >>> +	get_func = get_remote_image;
> >>> +	put_func = put_function;
> >>> +	return init_sync_structures();
> >>> +}
> >>> +
> >>> +int setup_TCP_server_socket(int port)
> >>> +{
> >>> +	struct sockaddr_in serv_addr;
> >>> +	int sockopt = 1;
> >>> +
> >>> +	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
> >>> +	if (sockfd < 0) {
> >>> +		pr_perror("Unable to open image socket");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	bzero((char *) &serv_addr, sizeof (serv_addr));
> >>> +	serv_addr.sin_family = AF_INET;
> >>> +	serv_addr.sin_addr.s_addr = INADDR_ANY;
> >>> +	serv_addr.sin_port = htons(port);
> >>> +
> >>> +	if (setsockopt(
> >>> +	    sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof (sockopt)) == -1) {
> >>> +		pr_perror("Unable to set SO_REUSEADDR");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) < 0) {
> >>> +		pr_perror("Unable to bind image socket");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	if (listen(sockfd, DEFAULT_LISTEN)) {
> >>> +		pr_perror("Unable to listen image socket");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	return sockfd;
> >>> +}
> >>> +
> >>> +int setup_TCP_client_socket(char* hostname, int port)
> >>> +{
> >>> +	int sockfd;
> >>> +	struct sockaddr_in serv_addr;
> >>> +	struct hostent *server;
> >>> +
> >>> +	sockfd = socket(AF_INET, SOCK_STREAM, 0);
> >>> +	if (sockfd < 0) {
> >>> +		pr_perror("Unable to open remote image socket");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	server = gethostbyname(hostname);
> >>> +	if (server == NULL) {
> >>> +		pr_perror("Unable to get host by name (%s)", hostname);
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	bzero((char *) &serv_addr, sizeof (serv_addr));
> >>> +	serv_addr.sin_family = AF_INET;
> >>> +	bcopy((char *) server->h_addr,
> >>> +	      (char *) &serv_addr.sin_addr.s_addr,
> >>> +	      server->h_length);
> >>> +	serv_addr.sin_port = htons(port);
> >>> +
> >>> +	if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
> >>> +		pr_perror("Unable to connect to remote restore host %s", hostname);
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	return sockfd;
> >>> +}
> >>> +
> >>> +int setup_UNIX_server_socket(char* path)
> >>> +{
> >>> +	struct sockaddr_un addr;
> >>> +	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> >>> +	if(sockfd < 0) {
> >>> +		pr_perror("Unable to open image socket");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	memset(&addr, 0, sizeof(addr));
> >>> +	addr.sun_family = AF_UNIX;
> >>> +	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> >>> +
> >>> +	unlink(path);
> >>> +
> >>> +	if (bind(sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
> >>> +		pr_perror("Unable to bind image socket");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	if (listen(sockfd, 50) == -1) {
> >>> +		pr_perror("Unable to listen image socket");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	return sockfd;
> >>> +}
> >>> +
> >>> +int setup_UNIX_client_socket(char* path)
> >>> +{
> >>> +	struct sockaddr_un addr;
> >>> +	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> >>> +	if (sockfd < 0) {
> >>> +		pr_perror("Unable to open local image socket");
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	memset(&addr, 0, sizeof(addr));
> >>> +	addr.sun_family = AF_UNIX;
> >>> +	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> >>> +
> >>> +	if (connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
> >>> +		pr_perror("Unable to connect to local socket: %s", path);
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	return sockfd;
> >>> +}
> >>> +
> >>> +int pb_write_obj(int fd, void* obj, int type)
> >>
> >> These pb_* helpers are not used outside the .c file, so should be static
> >> and removed from headers.
> > 
> > These two helpers are also used inside image-remote.c to send namespaces.
> 
> Ah, right...
> 
> >>
> >>> +{
> >>> +	struct cr_img img;
> >>> +	img._x.fd = fd;
> >>> +	bfd_setraw(&img._x);
> >>> +	return pb_write_one(&img, obj, type);
> >>> +}
> >>> +
> >>> +int pb_read_obj(int fd, void** pobj, int type)
> >>> +{
> >>> +	struct cr_img img;
> >>> +	img._x.fd = fd;
> >>> +	bfd_setraw(&img._x);
> >>> +	return do_pb_read_one(&img, pobj, type, true);
> >>> +}
> >>> +
> >>> +int write_header(int fd, char* namespace, char* path)
> >>> +{
> >>> +	RemoteImageEntry ri = REMOTE_IMAGE_ENTRY__INIT;
> >>> +	ri.name = path;
> >>> +	ri.namespace_ = namespace;
> >>> +	return pb_write_obj(fd, &ri, PB_REMOTE_IMAGE);
> >>> +}
> >>> +
> >>> +int read_header(int fd, char* namespace, char* path)
> >>> +{
> >>> +	RemoteImageEntry* ri;
> >>> +	int ret = pb_read_obj(fd, (void**)&ri, PB_REMOTE_IMAGE);
> >>> +	if (ret) {
> >>> +		strncpy(namespace, ri->namespace_, PATHLEN);
> >>> +		strncpy(path, ri->name, PATHLEN);
> >>> +	}
> >>> +	free(ri);
> >>> +	return ret;
> >>> +}
> >>> +
> >>> +static void add_worker(pthread_t tid)
> >>> +{
> >>> +	worker_thread* wthread = malloc(sizeof(worker_thread));
> >>> +	if(!wthread) {
> >>> +		pr_perror("Unable to allocate worker thread structure");
> >>> +	}
> >>> +	wthread->tid = tid;
> >>> +	pthread_mutex_lock(&workers_lock);
> >>> +	list_add_tail(&(wthread->l), &workers_head);
> >>> +	pthread_mutex_unlock(&workers_lock);
> >>> +	sem_post(&workers_semph);
> >>> +}
> >>> +
> >>> +void join_workers()
> >>> +{
> >>> +	worker_thread* wthread = NULL;
> >>> +	while (1) {
> >>> +	    if (list_empty(&workers_head)) {
> >>> +		    sem_wait(&workers_semph);
> >>> +		    continue;
> >>> +	    }
> >>> +	    wthread = list_entry(workers_head.next, worker_thread, l);
> >>> +	    if (pthread_join(wthread->tid, NULL)) {
> >>> +		    pr_perror("Could not join thread %lu", (unsigned long) wthread->tid);
> >>> +	    }
> >>> +	    else {
> >>> +		    list_del(&(wthread->l));
> >>> +		    free(wthread);
> >>> +	    }
> >>> +
> >>> +	}
> >>> +}
> >>> +
> >>> +void* accept_get_image_connections(void* port)
> >>> +{
> >>> +	socklen_t clilen;
> >>> +	long cli_fd;
> >>> +	pthread_t tid;
> >>> +	int get_fd = *((int*) port);
> >>> +	struct sockaddr_in cli_addr;
> >>> +	clilen = sizeof (cli_addr);
> >>> +
> >>> +	while (1) {
> >>> +		cli_fd = accept(get_fd, (struct sockaddr *) &cli_addr, &clilen);
> >>> +		if (cli_fd < 0) {
> >>> +			pr_perror("Unable to accept get image connection");
> >>> +			return NULL;
> >>> +		}
> >>> +
> >>> +		if (pthread_create(
> >>> +		    &tid, NULL, get_func, (void*) cli_fd)) {
> >>> +			pr_perror("Unable to create put thread");
> >>> +			return NULL;
> >>> +		}
> >>> +
> >>> +		add_worker(tid);
> >>> +	}
> >>> +}
> >>> +
> >>> +void* accept_put_image_connections(void* port)
> >>> +{
> >>> +	socklen_t clilen;
> >>> +	int cli_fd;
> >>> +	pthread_t tid;
> >>> +	int put_fd = *((int*) port);
> >>> +	struct sockaddr_in cli_addr;
> >>> +	clilen = sizeof(cli_addr);
> >>> +	char path_buf[PATHLEN];
> >>> +	char namespace_buf[PATHLEN];
> >>> +
> >>> +	while (1) {
> >>> +
> >>> +		cli_fd = accept(put_fd, (struct sockaddr *) &cli_addr, &clilen);
> >>> +		if (cli_fd < 0) {
> >>> +			pr_perror("Unable to accept put image connection");
> >>> +			return NULL;
> >>> +		}
> >>> +
> >>> +		if (read_header(cli_fd, namespace_buf, path_buf) < 0) {
> >>> +		    pr_perror("Error reading header");
> >>> +		    continue;
> >>> +		}
> >>> +
> >>> +		remote_image* rimg = get_rimg_by_name(namespace_buf, path_buf);
> >>> +
> >>> +		pr_info("Reveiced PUT request for %s:%s\n", path_buf, namespace_buf);
> >>> +
> >>> +		if (rimg == NULL) {
> >>> +			rimg = malloc(sizeof (remote_image));
> >>> +			if (rimg == NULL) {
> >>> +				pr_perror("Unable to allocate remote_image structures");
> >>> +				return NULL;
> >>> +			}
> >>> +
> >>> +			remote_buffer* buf = malloc(sizeof (remote_buffer));
> >>> +			if (buf == NULL) {
> >>> +				pr_perror("Unable to allocate remote_buffer structures");
> >>> +				return NULL;
> >>> +			}
> >>> +
> >>> +			strncpy(rimg->path, path_buf, PATHLEN);
> >>> +			strncpy(rimg->namespace, namespace_buf, PATHLEN);
> >>> +			buf->nbytes = 0;
> >>> +			INIT_LIST_HEAD(&(rimg->buf_head));
> >>> +			list_add_tail(&(buf->l), &(rimg->buf_head));
> >>> +		}
> >>> +		/* NOTE: we implement a PUT by clearing the previous file. */
> >>> +		else {
> >>> +		    pr_info("Clearing previous images for %s:%s\n",
> >>> +			    path_buf, namespace_buf);
> >>> +			pthread_mutex_lock(&rimg_lock);
> >>> +			list_del(&(rimg->l));
> >>> +			pthread_mutex_unlock(&rimg_lock);
> >>> +			while (!list_is_singular(&(rimg->buf_head))) {
> >>> +				list_del(rimg->buf_head.prev);
> >>> +			}
> >>> +			list_entry(rimg->buf_head.next, remote_buffer, l)->nbytes = 0;
> >>> +		}
> >>> +		rimg->src_fd = cli_fd;
> >>> +		rimg->dst_fd = -1;
> >>> +
> >>> +		if (pthread_create(
> >>> +		    &tid, NULL, put_func, (void*) rimg)) {
> >>> +			pr_perror("Unable to create put thread");
> >>> +			return NULL;
> >>> +		}
> >>> +
> >>> +		pr_info("Serving PUT request for %s:%s (tid=%lu)\n",
> >>> +			rimg->path, rimg->namespace, (unsigned long) tid);
> >>> +
> >>> +		add_worker(tid);
> >>> +
> >>> +		if (!strncmp(path_buf, DUMP_FINISH, sizeof (DUMP_FINISH))) {
> >>> +			finished = 1;
> >>> +			pr_info("Received DUMP FINISH\n");
> >>> +			sem_post(&rimg_semph);
> >>> +		}
> >>> +	}
> >>> +}
> >>> +
> >>> +int recv_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check)
> >>
> >> This routine makes sense only on the cache side, am I right? If yes, it would be nice
> >> to split the code into different .c files so that when reading the code it is clear
> >> which one of the cache-proxy pair we're at.
> > 
> > No, it is also used by the image-proxy to receive images from CRIU dump/predump.
> > 
> >>
> >>> +{
> >>> +	remote_buffer* curr_buf = list_entry(rbuff_head->next, remote_buffer, l);
> >>> +	int n, nblocks = 0;
> >>> +	uint64_t image_bytes = 0;
> >>> +	uint64_t recv_bytes = 0;
> >>> +
> >>> +	if (image_check && (read(fd, &image_bytes, sizeof(uint64_t)) != sizeof(uint64_t))) {
> >>> +		pr_perror("Unable to read size of the image %s", path);
> >>> +		close(fd);
> >>> +		return -1;
> >>> +	}
> >>> +
> >>> +	while(1) {
> >>> +		n = read(fd,
> >>> +			 curr_buf->buffer + curr_buf->nbytes,
> >>> +			 BUF_SIZE - curr_buf->nbytes);
> >>> +		if (n == 0 && image_check && image_bytes != recv_bytes) {
> >>> +			pr_info("Failed receiving %s (received %lu instead of %lu)\n",
> >>> +			    path, recv_bytes, image_bytes);
> >>> +			close(fd);
> >>> +			return -1;
> >>> +		}
> >>> +		else if (n == 0) {
> >>> +			pr_info("Finished receiving %s (%d full blocks, %d bytes on last block)\n",
> >>> +			    path, nblocks, curr_buf->nbytes);
> >>> +			close(fd);
> >>> +			return recv_bytes;
> >>> +		}
> >>> +		else if (n > 0) {
> >>> +			curr_buf->nbytes += n;
> >>> +			recv_bytes = nblocks*BUF_SIZE + curr_buf->nbytes;
> >>> +			if(curr_buf->nbytes == BUF_SIZE) {
> >>> +				remote_buffer* buf = malloc(sizeof(remote_buffer));
> >>> +				if (buf == NULL) {
> >>> +					pr_perror("Unable to allocate remote_buffer structures");
> >>> +					close(fd);
> >>> +					return -1;
> >>> +				}
> >>> +				buf->nbytes = 0;
> >>> +				list_add_tail(&(buf->l), rbuff_head);
> >>> +				curr_buf = buf;
> >>> +				nblocks++;
> >>> +			}
> >>> +		}
> >>> +		else {
> >>> +			pr_perror("Read on %s socket failed", path);
> >>> +			close(fd);
> >>> +			return -1;
> >>> +		}
> >>> +	}
> >>> +}
> >>> +
> >>> +int send_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check)
> >>
> >> And this one is opposite -- for proxy.
> > 
> > Same here. This function is used inside proxy to forward images but it is also 
> > used to send images to CRIU restore (from the cache).
> > 
> >>
> >>> +{
> >>> +	remote_buffer* curr_buf = list_entry(rbuff_head->next, remote_buffer, l);
> >>> +	int n, curr_offset, nblocks;
> >>> +
> >>> +	nblocks = 0;
> >>> +	curr_offset = 0;
> >>> +
> >>> +	if (image_check) {
> >>> +		uint64_t image_bytes = sizeof_remote_buffer(rbuff_head);
> >>> +		n = send(fd, &image_bytes, sizeof(uint64_t), MSG_NOSIGNAL);
> >>> +		if (n < sizeof(uint64_t)) {
> >>> +			pr_perror("Unable to send size of the image %s", path);
> >>> +			close(fd);
> >>> +			return -1;
> >>> +		}
> >>> +	}
> >>> +
> >>> +	while(1) {
> >>> +		n = send(
> >>> +		    fd,
> >>> +		    curr_buf->buffer + curr_offset,
> >>> +		    MIN(BUF_SIZE, curr_buf->nbytes) - curr_offset,
> >>> +		    MSG_NOSIGNAL);
> >>> +		if (n > -1) {
> >>> +			curr_offset += n;
> >>> +			if (curr_offset == BUF_SIZE) {
> >>> +				curr_buf =
> >>> +				    list_entry(curr_buf->l.next, remote_buffer, l);
> >>> +				nblocks++;
> >>> +				curr_offset = 0;
> >>> +			}
> >>> +			else if (curr_offset == curr_buf->nbytes) {
> >>> +				pr_info("Finished forwarding %s (%d full blocks, %d bytes on last block)\n",
> >>> +					path, nblocks, curr_offset);
> >>> +				close(fd);
> >>> +			       return nblocks*BUF_SIZE + curr_buf->nbytes;
> >>> +			}
> >>> +		}
> >>> +		else if (errno == EPIPE || errno == ECONNRESET) {
> >>> +			pr_warn("Connection for %s was closed early than expected\n",
> >>> +				path);
> >>> +			return 0;
> >>> +		}
> >>> +		else {
> >>> +			pr_perror("Write on %s socket failed", path);
> >>> +			return -1;
> >>> +		}
> >>> +	}
> >>> diff --git a/include/image-remote-pvt.h b/include/image-remote-pvt.h
> >>> new file mode 100644
> >>> index 0000000..0a20e6d
> >>> --- /dev/null
> >>> +++ b/include/image-remote-pvt.h
> >>> @@ -0,0 +1,52 @@
> >>> +#ifndef IMAGE_REMOTE_PVT_H
> >>> +#define	IMAGE_REMOTE_PVT_H
> >>> +
> >>> +#include <stdbool.h>
> >>> +#include "list.h"
> >>> +#include "image-remote.h"
> >>> +
> >>> +#define DEFAULT_LISTEN 50
> >>> +#define PAGESIZE 4096
> >>> +#define BUF_SIZE PAGESIZE
> >>> +
> >>> +typedef struct rbuf {
> >>> +	char buffer[BUF_SIZE];
> >>> +	int nbytes; /* How many bytes are in the buffer. */
> >>> +	struct list_head l;
> >>> +} remote_buffer;
> >>
> >> Minor, but still. I'd prefer plain structures as datatypes for objects that
> >> are used internally in some subsystem.
> > 
> > Sorry, I couldn't get the idea...
> 
> I meant that typedefs are not necessary for structures that are used
> internally by some subsystem. Just reading the struct keywork makes
> it easier to understand :)

Oh, I get it. I will fix that.

My time is crazy tight right until the end of the next week. Give me a couple of
days to re-send a new version of the patch.

> 
> -- Pavel


-- 
Rodrigo Bruno <rbruno at gsd.inesc-id.pt>


More information about the CRIU mailing list