[CRIU] [PATCH] Merge img-remote and img-remote-proto

Mike Rapoport rppt at linux.vnet.ibm.com
Thu Jul 6 10:53:54 MSK 2017


Ping?

On Sat, Jul 01, 2017 at 06:42:21PM +0300, Mike Rapoport wrote:
> From: Omri Kramer <omri.kramer at gmail.com>
> 
> There is no real need to have both.
> 
> Signed-off-by: Omri Kramer <omri.kramer at gmail.com>
> Singed-off-by: Lior Fisch <fischlior at gmail.com>
> Reviewed-by: Mike Rapoport <rppt at linux.vnet.ibm.com>
> ---
>  criu/Makefile.crtools           |   1 -
>  criu/img-cache.c                |   2 +-
>  criu/img-proxy.c                |   2 +-
>  criu/img-remote-proto.c         | 726 ----------------------------------------
>  criu/img-remote.c               | 719 ++++++++++++++++++++++++++++++++++++++-
>  criu/include/img-remote-proto.h |  88 -----
>  criu/include/img-remote.h       |  75 +++++
>  7 files changed, 794 insertions(+), 819 deletions(-)
>  delete mode 100644 criu/img-remote-proto.c
>  delete mode 100644 criu/include/img-remote-proto.h
> 
> diff --git a/criu/Makefile.crtools b/criu/Makefile.crtools
> index 9ce0652..b0cbeff 100644
> --- a/criu/Makefile.crtools
> +++ b/criu/Makefile.crtools
> @@ -34,7 +34,6 @@ obj-y			+= image.o
>  obj-y			+= img-remote.o
>  obj-y			+= img-proxy.o
>  obj-y			+= img-cache.o
> -obj-y			+= img-remote-proto.o
>  obj-y			+= ipc_ns.o
>  obj-y			+= irmap.o
>  obj-y			+= kcmp-ids.o
> diff --git a/criu/img-cache.c b/criu/img-cache.c
> index 2935970..7020a30 100644
> --- a/criu/img-cache.c
> +++ b/criu/img-cache.c
> @@ -1,6 +1,6 @@
>  #include <unistd.h>
> 
> -#include "img-remote-proto.h"
> +#include "img-remote.h"
>  #include "criu-log.h"
>  #include <pthread.h>
>  #include <sys/socket.h>
> diff --git a/criu/img-proxy.c b/criu/img-proxy.c
> index 58123dc..f56073b 100644
> --- a/criu/img-proxy.c
> +++ b/criu/img-proxy.c
> @@ -1,7 +1,7 @@
>  #include <unistd.h>
> 
>  #include "img-remote.h"
> -#include "img-remote-proto.h"
> +#include "img-remote.h"
>  #include "criu-log.h"
>  #include <pthread.h>
>  #include <fcntl.h>
> diff --git a/criu/img-remote-proto.c b/criu/img-remote-proto.c
> deleted file mode 100644
> index a5450e6..0000000
> --- a/criu/img-remote-proto.c
> +++ /dev/null
> @@ -1,726 +0,0 @@
> -#include <unistd.h>
> -#include <stdlib.h>
> -
> -#include <semaphore.h>
> -#include <sys/socket.h>
> -#include <netinet/in.h>
> -#include <netdb.h>
> -#include "sys/un.h"
> -#include <pthread.h>
> -#include <fcntl.h>
> -#include <sys/file.h>
> -
> -#include "img-remote-proto.h"
> -#include "criu-log.h"
> -#include "common/compiler.h"
> -
> -#include "protobuf.h"
> -#include "images/remote-image.pb-c.h"
> -#include "image.h"
> -
> -LIST_HEAD(rimg_head);
> -pthread_mutex_t rimg_lock = PTHREAD_MUTEX_INITIALIZER;
> -
> -pthread_mutex_t proxy_to_cache_lock = PTHREAD_MUTEX_INITIALIZER;
> -
> -LIST_HEAD(workers_head);
> -pthread_mutex_t workers_lock = PTHREAD_MUTEX_INITIALIZER;
> -sem_t workers_semph;
> -
> -struct rimage * (*wait_for_image) (struct wthread *wt);
> -
> -bool finished = false;
> -int writing = 0;
> -int forwarding = 0;
> -int proxy_to_cache_fd;
> -int local_req_fd;
> -
> -struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path)
> -{
> -	struct rimage *rimg = NULL;
> -
> -	pthread_mutex_lock(&rimg_lock);
> -	list_for_each_entry(rimg, &rimg_head, l) {
> -		if (!strncmp(rimg->path, path, PATHLEN) &&
> -		    !strncmp(rimg->snapshot_id, snapshot_id, PATHLEN)) {
> -			pthread_mutex_unlock(&rimg_lock);
> -			return rimg;
> -		}
> -	}
> -	pthread_mutex_unlock(&rimg_lock);
> -	return NULL;
> -}
> -
> -static struct wthread *get_wt_by_name(const char *snapshot_id, const char *path)
> -{
> -	struct wthread *wt = NULL;
> -
> -	pthread_mutex_lock(&workers_lock);
> -	list_for_each_entry(wt, &workers_head, l) {
> -		if (!strncmp(wt->path, path, PATHLEN) &&
> -		   !strncmp(wt->snapshot_id, snapshot_id, PATHLEN)) {
> -			pthread_mutex_unlock(&workers_lock);
> -			return wt;
> -		}
> -	}
> -	pthread_mutex_unlock(&workers_lock);
> -	return NULL;
> -}
> -
> -static int init_sync_structures(void)
> -{
> -	if (sem_init(&workers_semph, 0, 0) != 0) {
> -		pr_perror("Workers semaphore init failed");
> -		return -1;
> -	}
> -
> -	return 0;
> -}
> -
> -void prepare_recv_rimg(void)
> -{
> -	pthread_mutex_lock(&rimg_lock);
> -	writing++;
> -	pthread_mutex_unlock(&rimg_lock);
> -}
> -
> -void finalize_recv_rimg(struct rimage *rimg)
> -{
> -
> -	pthread_mutex_lock(&rimg_lock);
> -
> -	if (rimg)
> -		list_add_tail(&(rimg->l), &rimg_head);
> -	writing--;
> -	pthread_mutex_unlock(&rimg_lock);
> -	/* Wake thread waiting for this image. */
> -	if (rimg) {
> -		struct wthread *wt = get_wt_by_name(rimg->snapshot_id, rimg->path);
> -		if (wt)
> -			sem_post(&(wt->wakeup_sem));
> -	}
> -}
> -
> -bool is_receiving(void)
> -{
> -	int ret;
> -
> -	pthread_mutex_lock(&rimg_lock);
> -	ret = writing;
> -	pthread_mutex_unlock(&rimg_lock);
> -	return ret > 0;
> -}
> -
> -static void prepare_fwd_rimg(void)
> -{
> -	pthread_mutex_lock(&rimg_lock);
> -	forwarding++;
> -	pthread_mutex_unlock(&rimg_lock);
> -}
> -
> -static void finalize_fwd_rimg(void)
> -{
> -	pthread_mutex_lock(&rimg_lock);
> -	forwarding--;
> -	pthread_mutex_unlock(&rimg_lock);
> -}
> -
> -static bool is_forwarding(void)
> -{
> -	int ret;
> -
> -	pthread_mutex_lock(&rimg_lock);
> -	ret = forwarding;
> -	pthread_mutex_unlock(&rimg_lock);
> -	return ret > 0;
> -}
> -
> -/* This function is called when no more images are coming. Threads still waiting
> - * for images will be awaken to send a ENOENT (no such file) to the requester.
> - */
> -void unlock_workers(void)
> -{
> -	struct wthread *wt = NULL;
> -
> -	pthread_mutex_lock(&workers_lock);
> -	list_for_each_entry(wt, &workers_head, l)
> -		sem_post(&(wt->wakeup_sem));
> -	pthread_mutex_unlock(&workers_lock);
> -}
> -
> -int init_daemon(bool background, struct rimage *(*wfi)(struct wthread*))
> -{
> -	if (background) {
> -		if (daemon(1, 0) == -1) {
> -			pr_perror("Can't run service server in the background");
> -			return -1;
> -		}
> -	}
> -	wait_for_image = wfi;
> -	return init_sync_structures();
> -}
> -
> -int setup_TCP_server_socket(int port)
> -{
> -	struct sockaddr_in serv_addr;
> -	int sockopt = 1;
> -	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
> -
> -	if (sockfd < 0) {
> -		pr_perror("Unable to open image socket");
> -		return -1;
> -	}
> -
> -	bzero((char *) &serv_addr, sizeof(serv_addr));
> -	serv_addr.sin_family = AF_INET;
> -	serv_addr.sin_addr.s_addr = INADDR_ANY;
> -	serv_addr.sin_port = htons(port);
> -
> -	if (setsockopt(
> -	    sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof(sockopt)) == -1) {
> -		pr_perror("Unable to set SO_REUSEADDR");
> -		goto err;
> -	}
> -
> -	if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
> -		pr_perror("Unable to bind image socket");
> -		goto err;
> -	}
> -
> -	if (listen(sockfd, DEFAULT_LISTEN)) {
> -		pr_perror("Unable to listen image socket");
> -		goto err;
> -	}
> -
> -	return sockfd;
> -err:
> -	close(sockfd);
> -	return -1;
> -}
> -
> -int setup_TCP_client_socket(char *hostname, int port)
> -{
> -	int sockfd;
> -	struct sockaddr_in serv_addr;
> -	struct hostent *server;
> -
> -	sockfd = socket(AF_INET, SOCK_STREAM, 0);
> -	if (sockfd < 0) {
> -		pr_perror("Unable to open remote image socket");
> -		return -1;
> -	}
> -
> -	server = gethostbyname(hostname);
> -	if (server == NULL) {
> -		pr_perror("Unable to get host by name (%s)", hostname);
> -		goto err;
> -	}
> -
> -	bzero((char *) &serv_addr, sizeof(serv_addr));
> -	serv_addr.sin_family = AF_INET;
> -	bcopy((char *) server->h_addr,
> -	      (char *) &serv_addr.sin_addr.s_addr,
> -	      server->h_length);
> -	serv_addr.sin_port = htons(port);
> -
> -	if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
> -		pr_perror("Unable to connect to remote %s", hostname);
> -		goto err;
> -	}
> -
> -	return sockfd;
> -err:
> -	close(sockfd);
> -	return -1;
> -}
> -
> -int setup_UNIX_server_socket(char *path)
> -{
> -	struct sockaddr_un addr;
> -	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> -
> -	if (sockfd < 0) {
> -		pr_perror("Unable to open image socket");
> -		return -1;
> -	}
> -
> -	memset(&addr, 0, sizeof(addr));
> -	addr.sun_family = AF_UNIX;
> -	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> -
> -	unlink(path);
> -
> -	if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
> -		pr_perror("Unable to bind image socket");
> -		goto err;
> -	}
> -
> -	if (listen(sockfd, 50) == -1) {
> -		pr_perror("Unable to listen image socket");
> -		goto err;
> -	}
> -
> -	return sockfd;
> -err:
> -	close(sockfd);
> -	return -1;
> -}
> -
> -int setup_UNIX_client_socket(char *path)
> -{
> -	struct sockaddr_un addr;
> -	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> -
> -	if (sockfd < 0) {
> -		pr_perror("Unable to open local image socket");
> -		return -1;
> -	}
> -
> -	memset(&addr, 0, sizeof(addr));
> -	addr.sun_family = AF_UNIX;
> -	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> -
> -	if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
> -		pr_perror("Unable to connect to local socket: %s", path);
> -		close(sockfd);
> -		return -1;
> -	}
> -
> -	return sockfd;
> -}
> -
> -int64_t pb_write_obj(int fd, void *obj, int type)
> -{
> -	struct cr_img img;
> -
> -	img._x.fd = fd;
> -	bfd_setraw(&img._x);
> -	return pb_write_one(&img, obj, type);
> -}
> -
> -int64_t pb_read_obj(int fd, void **pobj, int type)
> -{
> -	struct cr_img img;
> -
> -	img._x.fd = fd;
> -	bfd_setraw(&img._x);
> -	return do_pb_read_one(&img, pobj, type, true);
> -}
> -
> -int64_t write_header(int fd, char *snapshot_id, char *path, int flags)
> -{
> -	LocalImageEntry li = LOCAL_IMAGE_ENTRY__INIT;
> -
> -	li.name = path;
> -	li.snapshot_id = snapshot_id;
> -	li.open_mode = flags;
> -	return pb_write_obj(fd, &li, PB_LOCAL_IMAGE);
> -}
> -
> -int64_t write_reply_header(int fd, int error)
> -{
> -	LocalImageReplyEntry lir = LOCAL_IMAGE_REPLY_ENTRY__INIT;
> -
> -	lir.error = error;
> -	return pb_write_obj(fd, &lir, PB_LOCAL_IMAGE_REPLY);
> -}
> -
> -int64_t write_remote_header(int fd, char *snapshot_id, char *path, int flags, uint64_t size)
> -{
> -	RemoteImageEntry ri = REMOTE_IMAGE_ENTRY__INIT;
> -
> -	ri.name = path;
> -	ri.snapshot_id = snapshot_id;
> -	ri.open_mode = flags;
> -	ri.size = size;
> -	return pb_write_obj(fd, &ri, PB_REMOTE_IMAGE);
> -}
> -
> -int64_t read_header(int fd, char *snapshot_id, char *path, int *flags)
> -{
> -	LocalImageEntry *li;
> -	int ret = pb_read_obj(fd, (void **)&li, PB_LOCAL_IMAGE);
> -
> -	if (ret > 0) {
> -		strncpy(snapshot_id, li->snapshot_id, PATHLEN);
> -		strncpy(path, li->name, PATHLEN);
> -		*flags = li->open_mode;
> -	}
> -	free(li);
> -	return ret;
> -}
> -
> -int64_t read_reply_header(int fd, int *error)
> -{
> -	LocalImageReplyEntry *lir;
> -	int ret = pb_read_obj(fd, (void **)&lir, PB_LOCAL_IMAGE_REPLY);
> -
> -	if (ret > 0)
> -		*error = lir->error;
> -	free(lir);
> -	return ret;
> -}
> -
> -int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *flags, uint64_t *size)
> -{
> -	RemoteImageEntry *ri;
> -	int ret = pb_read_obj(fd, (void **)&ri, PB_REMOTE_IMAGE);
> -
> -	if (ret > 0) {
> -		strncpy(snapshot_id, ri->snapshot_id, PATHLEN);
> -		strncpy(path, ri->name, PATHLEN);
> -		*flags = ri->open_mode;
> -		*size = ri->size;
> -	}
> -	free(ri);
> -	return ret;
> -}
> -
> -static struct wthread *new_worker(void)
> -{
> -	struct wthread *wt = malloc(sizeof(struct wthread));
> -
> -	if (!wt) {
> -		pr_perror("Unable to allocate worker thread structure");
> -		goto err;
> -	}
> -	if (sem_init(&(wt->wakeup_sem), 0, 0) != 0) {
> -		pr_perror("Workers semaphore init failed");
> -		goto err;
> -	}
> -	return wt;
> -err:
> -	free(wt);
> -	return NULL;
> -}
> -
> -static void add_worker(struct wthread *wt)
> -{
> -	pthread_mutex_lock(&workers_lock);
> -	list_add_tail(&(wt->l), &workers_head);
> -	pthread_mutex_unlock(&workers_lock);
> -	sem_post(&workers_semph);
> -}
> -
> -void join_workers(void)
> -{
> -	struct wthread *wthread = NULL;
> -
> -	while (! list_empty(&workers_head)) {
> -		wthread = list_entry(workers_head.next, struct wthread, l);
> -		pthread_join(wthread->tid, NULL);
> -		list_del(&(wthread->l));
> -		free(wthread);
> -	}
> -}
> -
> -static struct rimage *new_remote_image(char *path, char *snapshot_id)
> -{
> -	struct rimage *rimg = malloc(sizeof(struct rimage));
> -	struct rbuf *buf = malloc(sizeof(struct rbuf));
> -
> -	if (rimg == NULL || buf == NULL) {
> -		pr_perror("Unable to allocate remote image structures");
> -		goto err;
> -	}
> -
> -	strncpy(rimg->path, path, PATHLEN -1 );
> -	rimg->path[PATHLEN - 1] = '\0';
> -	strncpy(rimg->snapshot_id, snapshot_id, PATHLEN - 1);
> -	rimg->snapshot_id[PATHLEN - 1] = '\0';
> -	rimg->size = 0;
> -	buf->nbytes = 0;
> -	INIT_LIST_HEAD(&(rimg->buf_head));
> -	list_add_tail(&(buf->l), &(rimg->buf_head));
> -	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> -	rimg->curr_sent_bytes = 0;
> -
> -	if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
> -		pr_err("Remote image in_use mutex init failed\n");
> -		goto err;
> -	}
> -	return rimg;
> -err:
> -	free(rimg);
> -	free(buf);
> -	return NULL;
> -}
> -
> -/* Clears a remote image struct for reusing it. */
> -static struct rimage *clear_remote_image(struct rimage *rimg)
> -{
> -	pthread_mutex_lock(&(rimg->in_use));
> -
> -	while (!list_is_singular(&(rimg->buf_head))) {
> -		struct rbuf *buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
> -
> -		list_del(rimg->buf_head.prev);
> -		free(buf);
> -	}
> -
> -	list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
> -	rimg->size = 0;
> -	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> -	rimg->curr_sent_bytes = 0;
> -
> -	pthread_mutex_unlock(&(rimg->in_use));
> -
> -	return rimg;
> -}
> -
> -struct rimage *prepare_remote_image(char *path, char *snapshot_id, int open_mode)
> -{
> -	struct rimage *rimg = get_rimg_by_name(snapshot_id, path);
> -	/* There is no record of such image, create a new one. */
> -
> -	if (rimg == NULL)
> -		return new_remote_image(path, snapshot_id);
> -
> -	pthread_mutex_lock(&rimg_lock);
> -	list_del(&(rimg->l));
> -	pthread_mutex_unlock(&rimg_lock);
> -
> -	/* There is already an image record. Simply return it for appending. */
> -	if (open_mode == O_APPEND)
> -		return rimg;
> -	/* There is already an image record. Clear it for writing. */
> -	else
> -		return clear_remote_image(rimg);
> -}
> -
> -void *process_local_read(struct wthread *wt)
> -{
> -	struct rimage *rimg = NULL;
> -	int64_t ret;
> -	/* TODO - split wait_for_image
> -	 * in cache - improve the parent stuf
> -	 * in proxy - do not wait for anything, return no file
> -	 */
> -	rimg = wait_for_image(wt);
> -	if (!rimg) {
> -		pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
> -		if (write_reply_header(wt->fd, ENOENT) < 0)
> -			pr_perror("Error writing reply header for unexisting image");
> -		close(wt->fd);
> -		return NULL;
> -	} else {
> -		if (write_reply_header(wt->fd, 0) < 0) {
> -			pr_perror("Error writing reply header for %s:%s",
> -					wt->path, wt->snapshot_id);
> -			close(wt->fd);
> -			return NULL;
> -		}
> -	}
> -
> -	pthread_mutex_lock(&(rimg->in_use));
> -	ret = send_image(wt->fd, rimg, wt->flags, true);
> -	if (ret < 0)
> -		pr_perror("Unable to send %s:%s to CRIU (sent %ld bytes)",
> -				rimg->path, rimg->snapshot_id, (long)ret);
> -	else
> -		pr_info("Finished sending %s:%s to CRIU (sent %ld bytes)\n",
> -				rimg->path, rimg->snapshot_id, (long)ret);
> -	pthread_mutex_unlock(&(rimg->in_use));
> -	return NULL;
> -}
> -
> -static void *process_local_image_connection(void *ptr)
> -{
> -	struct wthread *wt = (struct wthread *) ptr;
> -	struct rimage *rimg = NULL;
> -	int64_t ret;
> -
> -	/* NOTE: the code inside this if is shared for both cache and proxy. */
> -	if (wt->flags == O_RDONLY)
> -		return process_local_read(wt);
> -
> -	/* NOTE: IMAGE PROXY ONLY. The image cache receives write connections
> -	 * through TCP (see accept_remote_image_connections).
> -	 */
> -	rimg = prepare_remote_image(wt->path, wt->snapshot_id, wt->flags);
> -	ret = recv_image(wt->fd, rimg, 0, wt->flags, true);
> -	if (ret < 0) {
> -		pr_perror("Unable to receive %s:%s to CRIU (received %ld bytes)",
> -				rimg->path, rimg->snapshot_id, (long)ret);
> -		finalize_recv_rimg(NULL);
> -		return NULL;
> -	}
> -	finalize_recv_rimg(rimg);
> -	pr_info("Finished receiving %s:%s (received %ld bytes)\n",
> -			rimg->path, rimg->snapshot_id, (long)ret);
> -
> -
> -	if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
> -		finished = true;
> -		shutdown(local_req_fd, SHUT_RD);
> -	} else {
> -		pthread_mutex_lock(&proxy_to_cache_lock);
> -		ret = forward_image(rimg);
> -		pthread_mutex_unlock(&proxy_to_cache_lock);
> -	}
> -
> -	finalize_fwd_rimg();
> -	if (ret < 0) {
> -		pr_perror("Unable to forward %s:%s to Image Cache",
> -				rimg->path, rimg->snapshot_id);
> -
> -		return NULL;
> -	}
> -
> -	if (finished && !is_forwarding() && !is_receiving()) {
> -		pr_info("Closing connection to Image Cache.\n");
> -		close(proxy_to_cache_fd);
> -		unlock_workers();
> -	}
> -	return NULL;
> -}
> -
> -
> -void *accept_local_image_connections(void *port)
> -{
> -	int fd = *((int *) port);
> -	int cli_fd;
> -	struct sockaddr_in cli_addr;
> -
> -	socklen_t clilen = sizeof(cli_addr);
> -	pthread_t tid;
> -	struct wthread *wt;
> -
> -	while (1) {
> -		cli_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
> -		if (cli_fd < 0) {
> -			if (!finished)
> -				pr_perror("Unable to accept local image connection");
> -			close(cli_fd);
> -			return NULL;
> -		}
> -
> -		wt = new_worker();
> -		wt->fd = cli_fd;
> -
> -		if (read_header(wt->fd, wt->snapshot_id, wt->path, &(wt->flags)) < 0) {
> -			pr_err("Error reading local image header\n");
> -			goto err;
> -		}
> -
> -		pr_info("Received %s request for %s:%s\n",
> -		    wt->flags == O_RDONLY ? "read" :
> -			wt->flags == O_APPEND ? "append" : "write",
> -		    wt->path, wt->snapshot_id);
> -
> -		/* These function calls are used to avoid other threads from
> -		 * thinking that there are no more images are coming.
> -		 */
> -		if (wt->flags != O_RDONLY) {
> -			prepare_recv_rimg();
> -			prepare_fwd_rimg();
> -		}
> -
> -		if (pthread_create(
> -		    &tid, NULL, process_local_image_connection, (void *) wt)) {
> -			pr_perror("Unable to create worker thread");
> -			goto err;
> -		}
> -
> -		wt->tid = tid;
> -		add_worker(wt);
> -	}
> -err:
> -	close(cli_fd);
> -	free(wt);
> -	return NULL;
> -}
> -
> -/* Note: size is a limit on how much we want to read from the socket.  Zero means
> - * read until the socket is closed.
> - */
> -int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
> -{
> -	struct rbuf *curr_buf = NULL;
> -	int n;
> -
> -	if (flags == O_APPEND)
> -		curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
> -	else
> -		curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> -
> -	while (1) {
> -		n = read(fd,
> -			 curr_buf->buffer + curr_buf->nbytes,
> -			 size ?
> -			     min((int) (size - rimg->size), BUF_SIZE - curr_buf->nbytes) :
> -			     BUF_SIZE - curr_buf->nbytes);
> -		if (n == 0) {
> -			if (close_fd)
> -				close(fd);
> -			return rimg->size;
> -		} else if (n > 0) {
> -			curr_buf->nbytes += n;
> -			rimg->size += n;
> -			if (curr_buf->nbytes == BUF_SIZE) {
> -			  struct rbuf *buf = malloc(sizeof(struct rbuf));
> -				if (buf == NULL) {
> -					pr_perror("Unable to allocate remote_buffer structures");
> -					if (close_fd)
> -						close(fd);
> -					return -1;
> -				}
> -				buf->nbytes = 0;
> -				list_add_tail(&(buf->l), &(rimg->buf_head));
> -				curr_buf = buf;
> -			}
> -			if (size && rimg->size == size) {
> -				if (close_fd)
> -					close(fd);
> -				return rimg->size;
> -			}
> -		} else {
> -			pr_perror("Read on %s:%s socket failed",
> -				rimg->path, rimg->snapshot_id);
> -			if (close_fd)
> -				close(fd);
> -			return -1;
> -		}
> -	}
> -}
> -
> -int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
> -{
> -
> -	int n, nblocks = 0;
> -
> -	if (flags != O_APPEND) {
> -		rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> -		rimg->curr_sent_bytes = 0;
> -	}
> -
> -	while (1) {
> -		n = send(
> -		    fd,
> -		    rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
> -		    min(BUF_SIZE, rimg->curr_sent_buf->nbytes) - rimg->curr_sent_bytes,
> -		    MSG_NOSIGNAL);
> -		if (n > -1) {
> -			rimg->curr_sent_bytes += n;
> -			if (rimg->curr_sent_bytes == BUF_SIZE) {
> -				rimg->curr_sent_buf =
> -				    list_entry(rimg->curr_sent_buf->l.next, struct rbuf, l);
> -				nblocks++;
> -				rimg->curr_sent_bytes = 0;
> -			} else if (rimg->curr_sent_bytes == rimg->curr_sent_buf->nbytes) {
> -				if (close_fd)
> -					close(fd);
> -				return nblocks*BUF_SIZE + rimg->curr_sent_buf->nbytes;
> -			}
> -		} else if (errno == EPIPE || errno == ECONNRESET) {
> -			pr_warn("Connection for %s:%s was closed early than expected\n",
> -				rimg->path, rimg->snapshot_id);
> -			return 0;
> -		} else {
> -			pr_perror("Write on %s:%s socket failed",
> -				rimg->path, rimg->snapshot_id);
> -			return -1;
> -		}
> -	}
> -
> -}
> diff --git a/criu/img-remote.c b/criu/img-remote.c
> index 2bf62d2..f812c52 100644
> --- a/criu/img-remote.c
> +++ b/criu/img-remote.c
> @@ -8,7 +8,6 @@
>  #include "xmalloc.h"
>  #include "criu-log.h"
>  #include "img-remote.h"
> -#include "img-remote-proto.h"
>  #include "images/remote-image.pb-c.h"
>  #include "protobuf-desc.h"
>  #include <fcntl.h>
> @@ -16,11 +15,37 @@
>  #include "common/compiler.h"
>  #include "cr_options.h"
> 
> +#include <semaphore.h>
> +#include "sys/un.h"
> +#include <pthread.h>
> +#include <sys/file.h>
> +
> +#include "protobuf.h"
> +#include "image.h"
> +
>  #define PB_LOCAL_IMAGE_SIZE PATHLEN
> 
>  static char *snapshot_id;
>  bool restoring = true;
> 
> +LIST_HEAD(rimg_head);
> +pthread_mutex_t rimg_lock = PTHREAD_MUTEX_INITIALIZER;
> +
> +pthread_mutex_t proxy_to_cache_lock = PTHREAD_MUTEX_INITIALIZER;
> +
> +LIST_HEAD(workers_head);
> +pthread_mutex_t workers_lock = PTHREAD_MUTEX_INITIALIZER;
> +sem_t workers_semph;
> +
> +struct rimage * (*wait_for_image) (struct wthread *wt);
> +
> +bool finished = false;
> +int writing = 0;
> +int forwarding = 0;
> +int proxy_to_cache_fd;
> +int local_req_fd;
> +
> +
>  LIST_HEAD(snapshot_head);
> 
>  /* A snapshot is a dump or pre-dump operation. Each snapshot is identified by an
> @@ -48,9 +73,699 @@ void add_snapshot(struct snapshot *snapshot)
>  	list_add_tail(&(snapshot->l), &snapshot_head);
>  }
> 
> +struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path)
> +{
> +	struct rimage *rimg = NULL;
> +
> +	pthread_mutex_lock(&rimg_lock);
> +	list_for_each_entry(rimg, &rimg_head, l) {
> +		if (!strncmp(rimg->path, path, PATHLEN) &&
> +		    !strncmp(rimg->snapshot_id, snapshot_id, PATHLEN)) {
> +			pthread_mutex_unlock(&rimg_lock);
> +			return rimg;
> +		}
> +	}
> +	pthread_mutex_unlock(&rimg_lock);
> +	return NULL;
> +}
> +
> +static struct wthread *get_wt_by_name(const char *snapshot_id, const char *path)
> +{
> +	struct wthread *wt = NULL;
> +
> +	pthread_mutex_lock(&workers_lock);
> +	list_for_each_entry(wt, &workers_head, l) {
> +		if (!strncmp(wt->path, path, PATHLEN) &&
> +		   !strncmp(wt->snapshot_id, snapshot_id, PATHLEN)) {
> +			pthread_mutex_unlock(&workers_lock);
> +			return wt;
> +		}
> +	}
> +	pthread_mutex_unlock(&workers_lock);
> +	return NULL;
> +}
> +
> +static int init_sync_structures(void)
> +{
> +	if (sem_init(&workers_semph, 0, 0) != 0) {
> +		pr_perror("Workers semaphore init failed");
> +		return -1;
> +	}
> +
> +	return 0;
> +}
> +
> +void prepare_recv_rimg(void)
> +{
> +	pthread_mutex_lock(&rimg_lock);
> +	writing++;
> +	pthread_mutex_unlock(&rimg_lock);
> +}
> +
> +void finalize_recv_rimg(struct rimage *rimg)
> +{
> +
> +	pthread_mutex_lock(&rimg_lock);
> +
> +	if (rimg)
> +		list_add_tail(&(rimg->l), &rimg_head);
> +	writing--;
> +	pthread_mutex_unlock(&rimg_lock);
> +	/* Wake thread waiting for this image. */
> +	if (rimg) {
> +		struct wthread *wt = get_wt_by_name(rimg->snapshot_id, rimg->path);
> +		if (wt)
> +			sem_post(&(wt->wakeup_sem));
> +	}
> +}
> +
> +bool is_receiving(void)
> +{
> +	int ret;
> +
> +	pthread_mutex_lock(&rimg_lock);
> +	ret = writing;
> +	pthread_mutex_unlock(&rimg_lock);
> +	return ret > 0;
> +}
> +
> +static void prepare_fwd_rimg(void)
> +{
> +	pthread_mutex_lock(&rimg_lock);
> +	forwarding++;
> +	pthread_mutex_unlock(&rimg_lock);
> +}
> +
> +static void finalize_fwd_rimg(void)
> +{
> +	pthread_mutex_lock(&rimg_lock);
> +	forwarding--;
> +	pthread_mutex_unlock(&rimg_lock);
> +}
> +
> +static bool is_forwarding(void)
> +{
> +	int ret;
> +
> +	pthread_mutex_lock(&rimg_lock);
> +	ret = forwarding;
> +	pthread_mutex_unlock(&rimg_lock);
> +	return ret > 0;
> +}
> +
> +/* This function is called when no more images are coming. Threads still waiting
> + * for images will be awaken to send a ENOENT (no such file) to the requester.
> + */
> +void unlock_workers(void)
> +{
> +	struct wthread *wt = NULL;
> +
> +	pthread_mutex_lock(&workers_lock);
> +	list_for_each_entry(wt, &workers_head, l)
> +		sem_post(&(wt->wakeup_sem));
> +	pthread_mutex_unlock(&workers_lock);
> +}
> +
> +int init_daemon(bool background, struct rimage *(*wfi)(struct wthread*))
> +{
> +	if (background) {
> +		if (daemon(1, 0) == -1) {
> +			pr_perror("Can't run service server in the background");
> +			return -1;
> +		}
> +	}
> +	wait_for_image = wfi;
> +	return init_sync_structures();
> +}
> +
> +int setup_TCP_server_socket(int port)
> +{
> +	struct sockaddr_in serv_addr;
> +	int sockopt = 1;
> +	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
> +
> +	if (sockfd < 0) {
> +		pr_perror("Unable to open image socket");
> +		return -1;
> +	}
> +
> +	bzero((char *) &serv_addr, sizeof(serv_addr));
> +	serv_addr.sin_family = AF_INET;
> +	serv_addr.sin_addr.s_addr = INADDR_ANY;
> +	serv_addr.sin_port = htons(port);
> +
> +	if (setsockopt(
> +	    sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof(sockopt)) == -1) {
> +		pr_perror("Unable to set SO_REUSEADDR");
> +		goto err;
> +	}
> +
> +	if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
> +		pr_perror("Unable to bind image socket");
> +		goto err;
> +	}
> +
> +	if (listen(sockfd, DEFAULT_LISTEN)) {
> +		pr_perror("Unable to listen image socket");
> +		goto err;
> +	}
> +
> +	return sockfd;
> +err:
> +	close(sockfd);
> +	return -1;
> +}
> +
> +int setup_TCP_client_socket(char *hostname, int port)
> +{
> +	int sockfd;
> +	struct sockaddr_in serv_addr;
> +	struct hostent *server;
> +
> +	sockfd = socket(AF_INET, SOCK_STREAM, 0);
> +	if (sockfd < 0) {
> +		pr_perror("Unable to open remote image socket");
> +		return -1;
> +	}
> +
> +	server = gethostbyname(hostname);
> +	if (server == NULL) {
> +		pr_perror("Unable to get host by name (%s)", hostname);
> +		goto err;
> +	}
> +
> +	bzero((char *) &serv_addr, sizeof(serv_addr));
> +	serv_addr.sin_family = AF_INET;
> +	bcopy((char *) server->h_addr,
> +	      (char *) &serv_addr.sin_addr.s_addr,
> +	      server->h_length);
> +	serv_addr.sin_port = htons(port);
> +
> +	if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
> +		pr_perror("Unable to connect to remote %s", hostname);
> +		goto err;
> +	}
> +
> +	return sockfd;
> +err:
> +	close(sockfd);
> +	return -1;
> +}
> +
> +int setup_UNIX_server_socket(char *path)
> +{
> +	struct sockaddr_un addr;
> +	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> +
> +	if (sockfd < 0) {
> +		pr_perror("Unable to open image socket");
> +		return -1;
> +	}
> +
> +	memset(&addr, 0, sizeof(addr));
> +	addr.sun_family = AF_UNIX;
> +	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> +
> +	unlink(path);
> +
> +	if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
> +		pr_perror("Unable to bind image socket");
> +		goto err;
> +	}
> +
> +	if (listen(sockfd, 50) == -1) {
> +		pr_perror("Unable to listen image socket");
> +		goto err;
> +	}
> +
> +	return sockfd;
> +err:
> +	close(sockfd);
> +	return -1;
> +}
> +
> +static int setup_UNIX_client_socket(char *path)
> +{
> +	struct sockaddr_un addr;
> +	int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> +
> +	if (sockfd < 0) {
> +		pr_perror("Unable to open local image socket");
> +		return -1;
> +	}
> +
> +	memset(&addr, 0, sizeof(addr));
> +	addr.sun_family = AF_UNIX;
> +	strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> +
> +	if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
> +		pr_perror("Unable to connect to local socket: %s", path);
> +		close(sockfd);
> +		return -1;
> +	}
> +
> +	return sockfd;
> +}
> +
> +static int64_t pb_write_obj(int fd, void *obj, int type)
> +{
> +	struct cr_img img;
> +
> +	img._x.fd = fd;
> +	bfd_setraw(&img._x);
> +	return pb_write_one(&img, obj, type);
> +}
> +
> +static int64_t pb_read_obj(int fd, void **pobj, int type)
> +{
> +	struct cr_img img;
> +
> +	img._x.fd = fd;
> +	bfd_setraw(&img._x);
> +	return do_pb_read_one(&img, pobj, type, true);
> +}
> +
> +static int64_t write_header(int fd, char *snapshot_id, char *path, int flags)
> +{
> +	LocalImageEntry li = LOCAL_IMAGE_ENTRY__INIT;
> +
> +	li.name = path;
> +	li.snapshot_id = snapshot_id;
> +	li.open_mode = flags;
> +	return pb_write_obj(fd, &li, PB_LOCAL_IMAGE);
> +}
> +
> +static int64_t write_reply_header(int fd, int error)
> +{
> +	LocalImageReplyEntry lir = LOCAL_IMAGE_REPLY_ENTRY__INIT;
> +
> +	lir.error = error;
> +	return pb_write_obj(fd, &lir, PB_LOCAL_IMAGE_REPLY);
> +}
> +
> +int64_t write_remote_header(int fd, char *snapshot_id, char *path, int flags, uint64_t size)
> +{
> +	RemoteImageEntry ri = REMOTE_IMAGE_ENTRY__INIT;
> +
> +	ri.name = path;
> +	ri.snapshot_id = snapshot_id;
> +	ri.open_mode = flags;
> +	ri.size = size;
> +	return pb_write_obj(fd, &ri, PB_REMOTE_IMAGE);
> +}
> +
> +static int64_t read_header(int fd, char *snapshot_id, char *path, int *flags)
> +{
> +	LocalImageEntry *li;
> +	int ret = pb_read_obj(fd, (void **)&li, PB_LOCAL_IMAGE);
> +
> +	if (ret > 0) {
> +		strncpy(snapshot_id, li->snapshot_id, PATHLEN);
> +		strncpy(path, li->name, PATHLEN);
> +		*flags = li->open_mode;
> +	}
> +	free(li);
> +	return ret;
> +}
> +
> +static int64_t read_reply_header(int fd, int *error)
> +{
> +	LocalImageReplyEntry *lir;
> +	int ret = pb_read_obj(fd, (void **)&lir, PB_LOCAL_IMAGE_REPLY);
> +
> +	if (ret > 0)
> +		*error = lir->error;
> +	free(lir);
> +	return ret;
> +}
> +
> +int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *flags, uint64_t *size)
> +{
> +	RemoteImageEntry *ri;
> +	int ret = pb_read_obj(fd, (void **)&ri, PB_REMOTE_IMAGE);
> +
> +	if (ret > 0) {
> +		strncpy(snapshot_id, ri->snapshot_id, PATHLEN);
> +		strncpy(path, ri->name, PATHLEN);
> +		*flags = ri->open_mode;
> +		*size = ri->size;
> +	}
> +	free(ri);
> +	return ret;
> +}
> +
> +static struct wthread *new_worker(void)
> +{
> +	struct wthread *wt = malloc(sizeof(struct wthread));
> +
> +	if (!wt) {
> +		pr_perror("Unable to allocate worker thread structure");
> +		goto err;
> +	}
> +	if (sem_init(&(wt->wakeup_sem), 0, 0) != 0) {
> +		pr_perror("Workers semaphore init failed");
> +		goto err;
> +	}
> +	return wt;
> +err:
> +	free(wt);
> +	return NULL;
> +}
> +
> +static void add_worker(struct wthread *wt)
> +{
> +	pthread_mutex_lock(&workers_lock);
> +	list_add_tail(&(wt->l), &workers_head);
> +	pthread_mutex_unlock(&workers_lock);
> +	sem_post(&workers_semph);
> +}
> +
> +void join_workers(void)
> +{
> +	struct wthread *wthread = NULL;
> +
> +	while (! list_empty(&workers_head)) {
> +		wthread = list_entry(workers_head.next, struct wthread, l);
> +		pthread_join(wthread->tid, NULL);
> +		list_del(&(wthread->l));
> +		free(wthread);
> +	}
> +}
> +
> +static struct rimage *new_remote_image(char *path, char *snapshot_id)
> +{
> +	struct rimage *rimg = malloc(sizeof(struct rimage));
> +	struct rbuf *buf = malloc(sizeof(struct rbuf));
> +
> +	if (rimg == NULL || buf == NULL) {
> +		pr_perror("Unable to allocate remote image structures");
> +		goto err;
> +	}
> +
> +	strncpy(rimg->path, path, PATHLEN -1 );
> +	rimg->path[PATHLEN - 1] = '\0';
> +	strncpy(rimg->snapshot_id, snapshot_id, PATHLEN - 1);
> +	rimg->snapshot_id[PATHLEN - 1] = '\0';
> +	rimg->size = 0;
> +	buf->nbytes = 0;
> +	INIT_LIST_HEAD(&(rimg->buf_head));
> +	list_add_tail(&(buf->l), &(rimg->buf_head));
> +	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +	rimg->curr_sent_bytes = 0;
> +
> +	if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
> +		pr_err("Remote image in_use mutex init failed\n");
> +		goto err;
> +	}
> +	return rimg;
> +err:
> +	free(rimg);
> +	free(buf);
> +	return NULL;
> +}
> +
> +/* Clears a remote image struct for reusing it. */
> +static struct rimage *clear_remote_image(struct rimage *rimg)
> +{
> +	pthread_mutex_lock(&(rimg->in_use));
> +
> +	while (!list_is_singular(&(rimg->buf_head))) {
> +		struct rbuf *buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
> +
> +		list_del(rimg->buf_head.prev);
> +		free(buf);
> +	}
> +
> +	list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
> +	rimg->size = 0;
> +	rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +	rimg->curr_sent_bytes = 0;
> +
> +	pthread_mutex_unlock(&(rimg->in_use));
> +
> +	return rimg;
> +}
> +
> +struct rimage *prepare_remote_image(char *path, char *snapshot_id, int open_mode)
> +{
> +	struct rimage *rimg = get_rimg_by_name(snapshot_id, path);
> +	/* There is no record of such image, create a new one. */
> +
> +	if (rimg == NULL)
> +		return new_remote_image(path, snapshot_id);
> +
> +	pthread_mutex_lock(&rimg_lock);
> +	list_del(&(rimg->l));
> +	pthread_mutex_unlock(&rimg_lock);
> +
> +	/* There is already an image record. Simply return it for appending. */
> +	if (open_mode == O_APPEND)
> +		return rimg;
> +	/* There is already an image record. Clear it for writing. */
> +	else
> +		return clear_remote_image(rimg);
> +}
> +
> +static void *process_local_read(struct wthread *wt)
> +{
> +	struct rimage *rimg = NULL;
> +	int64_t ret;
> +	/* TODO - split wait_for_image
> +	 * in cache - improve the parent stuf
> +	 * in proxy - do not wait for anything, return no file
> +	 */
> +	rimg = wait_for_image(wt);
> +	if (!rimg) {
> +		pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
> +		if (write_reply_header(wt->fd, ENOENT) < 0)
> +			pr_perror("Error writing reply header for unexisting image");
> +		close(wt->fd);
> +		return NULL;
> +	} else {
> +		if (write_reply_header(wt->fd, 0) < 0) {
> +			pr_perror("Error writing reply header for %s:%s",
> +					wt->path, wt->snapshot_id);
> +			close(wt->fd);
> +			return NULL;
> +		}
> +	}
> +
> +	pthread_mutex_lock(&(rimg->in_use));
> +	ret = send_image(wt->fd, rimg, wt->flags, true);
> +	if (ret < 0)
> +		pr_perror("Unable to send %s:%s to CRIU (sent %ld bytes)",
> +				rimg->path, rimg->snapshot_id, (long)ret);
> +	else
> +		pr_info("Finished sending %s:%s to CRIU (sent %ld bytes)\n",
> +				rimg->path, rimg->snapshot_id, (long)ret);
> +	pthread_mutex_unlock(&(rimg->in_use));
> +	return NULL;
> +}
> +
> +static void *process_local_image_connection(void *ptr)
> +{
> +	struct wthread *wt = (struct wthread *) ptr;
> +	struct rimage *rimg = NULL;
> +	int64_t ret;
> +
> +	/* NOTE: the code inside this if is shared for both cache and proxy. */
> +	if (wt->flags == O_RDONLY)
> +		return process_local_read(wt);
> +
> +	/* NOTE: IMAGE PROXY ONLY. The image cache receives write connections
> +	 * through TCP (see accept_remote_image_connections).
> +	 */
> +	rimg = prepare_remote_image(wt->path, wt->snapshot_id, wt->flags);
> +	ret = recv_image(wt->fd, rimg, 0, wt->flags, true);
> +	if (ret < 0) {
> +		pr_perror("Unable to receive %s:%s to CRIU (received %ld bytes)",
> +				rimg->path, rimg->snapshot_id, (long)ret);
> +		finalize_recv_rimg(NULL);
> +		return NULL;
> +	}
> +	finalize_recv_rimg(rimg);
> +	pr_info("Finished receiving %s:%s (received %ld bytes)\n",
> +			rimg->path, rimg->snapshot_id, (long)ret);
> +
> +
> +	if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
> +		finished = true;
> +		shutdown(local_req_fd, SHUT_RD);
> +	} else {
> +		pthread_mutex_lock(&proxy_to_cache_lock);
> +		ret = forward_image(rimg);
> +		pthread_mutex_unlock(&proxy_to_cache_lock);
> +	}
> +
> +	finalize_fwd_rimg();
> +	if (ret < 0) {
> +		pr_perror("Unable to forward %s:%s to Image Cache",
> +				rimg->path, rimg->snapshot_id);
> +
> +		return NULL;
> +	}
> +
> +	if (finished && !is_forwarding() && !is_receiving()) {
> +		pr_info("Closing connection to Image Cache.\n");
> +		close(proxy_to_cache_fd);
> +		unlock_workers();
> +	}
> +	return NULL;
> +}
> +
> +
> +void *accept_local_image_connections(void *port)
> +{
> +	int fd = *((int *) port);
> +	int cli_fd;
> +	struct sockaddr_in cli_addr;
> +
> +	socklen_t clilen = sizeof(cli_addr);
> +	pthread_t tid;
> +	struct wthread *wt;
> +
> +	while (1) {
> +		cli_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
> +		if (cli_fd < 0) {
> +			if (!finished)
> +				pr_perror("Unable to accept local image connection");
> +			close(cli_fd);
> +			return NULL;
> +		}
> +
> +		wt = new_worker();
> +		wt->fd = cli_fd;
> +
> +		if (read_header(wt->fd, wt->snapshot_id, wt->path, &(wt->flags)) < 0) {
> +			pr_err("Error reading local image header\n");
> +			goto err;
> +		}
> +
> +		pr_info("Received %s request for %s:%s\n",
> +		    wt->flags == O_RDONLY ? "read" :
> +			wt->flags == O_APPEND ? "append" : "write",
> +		    wt->path, wt->snapshot_id);
> +
> +		/* These function calls are used to avoid other threads from
> +		 * thinking that there are no more images are coming.
> +		 */
> +		if (wt->flags != O_RDONLY) {
> +			prepare_recv_rimg();
> +			prepare_fwd_rimg();
> +		}
> +
> +		if (pthread_create(
> +		    &tid, NULL, process_local_image_connection, (void *) wt)) {
> +			pr_perror("Unable to create worker thread");
> +			goto err;
> +		}
> +
> +		wt->tid = tid;
> +		add_worker(wt);
> +	}
> +err:
> +	close(cli_fd);
> +	free(wt);
> +	return NULL;
> +}
> +
> +/* Note: size is a limit on how much we want to read from the socket.  Zero means
> + * read until the socket is closed.
> + */
> +int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
> +{
> +	struct rbuf *curr_buf = NULL;
> +	int n;
> +
> +	if (flags == O_APPEND)
> +		curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
> +	else
> +		curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +
> +	while (1) {
> +		n = read(fd,
> +			 curr_buf->buffer + curr_buf->nbytes,
> +			 size ?
> +			     min((int) (size - rimg->size), BUF_SIZE - curr_buf->nbytes) :
> +			     BUF_SIZE - curr_buf->nbytes);
> +		if (n == 0) {
> +			if (close_fd)
> +				close(fd);
> +			return rimg->size;
> +		} else if (n > 0) {
> +			curr_buf->nbytes += n;
> +			rimg->size += n;
> +			if (curr_buf->nbytes == BUF_SIZE) {
> +			  struct rbuf *buf = malloc(sizeof(struct rbuf));
> +				if (buf == NULL) {
> +					pr_perror("Unable to allocate remote_buffer structures");
> +					if (close_fd)
> +						close(fd);
> +					return -1;
> +				}
> +				buf->nbytes = 0;
> +				list_add_tail(&(buf->l), &(rimg->buf_head));
> +				curr_buf = buf;
> +			}
> +			if (size && rimg->size == size) {
> +				if (close_fd)
> +					close(fd);
> +				return rimg->size;
> +			}
> +		} else {
> +			pr_perror("Read on %s:%s socket failed",
> +				rimg->path, rimg->snapshot_id);
> +			if (close_fd)
> +				close(fd);
> +			return -1;
> +		}
> +	}
> +}
> +
> +int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
> +{
> +
> +	int n, nblocks = 0;
> +
> +	if (flags != O_APPEND) {
> +		rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
> +		rimg->curr_sent_bytes = 0;
> +	}
> +
> +	while (1) {
> +		n = send(
> +		    fd,
> +		    rimg->curr_sent_buf->buffer + rimg->curr_sent_bytes,
> +		    min(BUF_SIZE, rimg->curr_sent_buf->nbytes) - rimg->curr_sent_bytes,
> +		    MSG_NOSIGNAL);
> +		if (n > -1) {
> +			rimg->curr_sent_bytes += n;
> +			if (rimg->curr_sent_bytes == BUF_SIZE) {
> +				rimg->curr_sent_buf =
> +				    list_entry(rimg->curr_sent_buf->l.next, struct rbuf, l);
> +				nblocks++;
> +				rimg->curr_sent_bytes = 0;
> +			} else if (rimg->curr_sent_bytes == rimg->curr_sent_buf->nbytes) {
> +				if (close_fd)
> +					close(fd);
> +				return nblocks*BUF_SIZE + rimg->curr_sent_buf->nbytes;
> +			}
> +		} else if (errno == EPIPE || errno == ECONNRESET) {
> +			pr_warn("Connection for %s:%s was closed early than expected\n",
> +				rimg->path, rimg->snapshot_id);
> +			return 0;
> +		} else {
> +			pr_perror("Write on %s:%s socket failed",
> +				rimg->path, rimg->snapshot_id);
> +			return -1;
> +		}
> +	}
> +
> +}
> +
>  int read_remote_image_connection(char *snapshot_id, char *path)
>  {
> -	int error;
> +	int error = 0;
>  	int sockfd = setup_UNIX_client_socket(restoring ? DEFAULT_CACHE_SOCKET: DEFAULT_PROXY_SOCKET);
> 
>  	if (sockfd < 0) {
> diff --git a/criu/include/img-remote-proto.h b/criu/include/img-remote-proto.h
> deleted file mode 100644
> index 13cf6c6..0000000
> --- a/criu/include/img-remote-proto.h
> +++ /dev/null
> @@ -1,88 +0,0 @@
> -#ifndef IMAGE_REMOTE_PVT_H
> -#define	IMAGE_REMOTE_PVT_H
> -
> -#include <stdbool.h>
> -#include <stdint.h>
> -#include "common/list.h"
> -#include "img-remote.h"
> -#include <pthread.h>
> -#include <semaphore.h>
> -
> -#define DEFAULT_LISTEN 50
> -#ifndef PAGESIZE
> -#define PAGESIZE 4096
> -#endif
> -#define BUF_SIZE PAGESIZE
> -
> -struct rbuf {
> -	char buffer[BUF_SIZE];
> -	int nbytes; /* How many bytes are in the buffer. */
> -	struct list_head l;
> -};
> -
> -struct rimage {
> -	char path[PATHLEN];
> -	char snapshot_id[PATHLEN];
> -	struct list_head l;
> -	struct list_head buf_head;
> -	/* Used to track already sent buffers when the image is appended. */
> -	struct rbuf *curr_sent_buf;
> -	/* Similar to the previous field. Number of bytes sent in 'curr_sent_buf'. */
> -	int curr_sent_bytes;
> -	uint64_t size; /* number of bytes */
> -	pthread_mutex_t in_use; /* Only one operation at a time, per image. */
> -};
> -
> -struct wthread {
> -	pthread_t tid;
> -	struct list_head l;
> -	/* Client fd. */
> -	int fd;
> -	/* The path and snapshot_id identify the request handled by this thread. */
> -	char path[PATHLEN];
> -	char snapshot_id[PATHLEN];
> -	int flags;
> -	/* This semph is used to wake this thread if the image is in memory.*/
> -	sem_t wakeup_sem;
> -};
> -
> -/* This variable is used to indicate when the dump is finished. */
> -extern bool finished;
> -/* This is the proxy to cache TCP socket FD. */
> -extern int proxy_to_cache_fd;
> -/* This the unix socket used to fulfill local requests. */
> -extern int local_req_fd;
> -
> -int init_daemon(bool background, struct rimage *(*wfi)(struct wthread*));
> -
> -void join_workers(void);
> -void unlock_workers(void);
> -
> -void prepare_recv_rimg(void);
> -void finalize_recv_rimg(struct rimage *rimg);
> -struct rimage *prepare_remote_image(char *path, char *namesapce, int flags);
> -struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path);
> -bool is_receiving(void);
> -
> -void *accept_local_image_connections(void *ptr);
> -void *accept_remote_image_connections(void *ptr);
> -
> -int64_t forward_image(struct rimage *rimg);
> -int64_t send_image(int fd, struct rimage *rimg, int flags, bool image_check);
> -int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool image_check);
> -
> -int64_t pb_write_obj(int fd, void *obj, int type);
> -int64_t pb_read_obj(int fd, void **obj, int type);
> -
> -int64_t write_header(int fd, char *snapshot_id, char *path, int open_mode);
> -int64_t read_header(int fd, char *snapshot_id, char *path, int *open_mode);
> -int64_t write_reply_header(int fd, int error);
> -int64_t read_reply_header(int fd, int *error);
> -int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *open_mode, uint64_t *size);
> -int64_t write_remote_header(int fd, char *snapshot_id, char *path, int open_mode, uint64_t size);
> -
> -int setup_TCP_server_socket(int port);
> -int setup_TCP_client_socket(char *hostname, int port);
> -int setup_UNIX_client_socket(char *path);
> -int setup_UNIX_server_socket(char *path);
> -#endif
> diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
> index 38acbd2..1771d31 100644
> --- a/criu/include/img-remote.h
> +++ b/criu/include/img-remote.h
> @@ -1,6 +1,11 @@
>  #include <limits.h>
>  #include <stdbool.h>
> 
> +#include <stdint.h>
> +#include "common/list.h"
> +#include <pthread.h>
> +#include <semaphore.h>
> +
>  #ifndef IMAGE_REMOTE_H
>  #define	IMAGE_REMOTE_H
> 
> @@ -14,6 +19,76 @@
>  #define DEFAULT_CACHE_PORT 9996
>  #define DEFAULT_CACHE_HOST "localhost"
> 
> +#define DEFAULT_LISTEN 50
> +#ifndef PAGESIZE
> +#define PAGESIZE 4096
> +#endif
> +#define BUF_SIZE PAGESIZE
> +
> +struct rbuf {
> +	char buffer[BUF_SIZE];
> +	int nbytes; /* How many bytes are in the buffer. */
> +	struct list_head l;
> +};
> +
> +struct rimage {
> +	char path[PATHLEN];
> +	char snapshot_id[PATHLEN];
> +	struct list_head l;
> +	struct list_head buf_head;
> +	/* Used to track already sent buffers when the image is appended. */
> +	struct rbuf *curr_sent_buf;
> +	/* Similar to the previous field. Number of bytes sent in 'curr_sent_buf'. */
> +	int curr_sent_bytes;
> +	uint64_t size; /* number of bytes */
> +	pthread_mutex_t in_use; /* Only one operation at a time, per image. */
> +};
> +
> +struct wthread {
> +	pthread_t tid;
> +	struct list_head l;
> +	/* Client fd. */
> +	int fd;
> +	/* The path and snapshot_id identify the request handled by this thread. */
> +	char path[PATHLEN];
> +	char snapshot_id[PATHLEN];
> +	int flags;
> +	/* This semph is used to wake this thread if the image is in memory.*/
> +	sem_t wakeup_sem;
> +};
> +
> +/* This variable is used to indicate when the dump is finished. */
> +extern bool finished;
> +/* This is the proxy to cache TCP socket FD. */
> +extern int proxy_to_cache_fd;
> +/* This the unix socket used to fulfill local requests. */
> +extern int local_req_fd;
> +
> +int init_daemon(bool background, struct rimage *(*wfi)(struct wthread*));
> +
> +void join_workers(void);
> +void unlock_workers(void);
> +
> +void prepare_recv_rimg(void);
> +void finalize_recv_rimg(struct rimage *rimg);
> +struct rimage *prepare_remote_image(char *path, char *namesapce, int flags);
> +struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path);
> +bool is_receiving(void);
> +
> +void *accept_local_image_connections(void *ptr);
> +void *accept_remote_image_connections(void *ptr);
> +
> +int64_t forward_image(struct rimage *rimg);
> +int64_t send_image(int fd, struct rimage *rimg, int flags, bool image_check);
> +int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool image_check);
> +
> +int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *open_mode, uint64_t *size);
> +int64_t write_remote_header(int fd, char *snapshot_id, char *path, int open_mode, uint64_t size);
> +
> +int setup_TCP_server_socket(int port);
> +int setup_TCP_client_socket(char *hostname, int port);
> +int setup_UNIX_server_socket(char *path);
> +
>  /* Called by restore to get the fd correspondent to a particular path.  This call
>   * will block until the connection is received.
>   */
> -- 
> 2.7.4
> 
> _______________________________________________
> CRIU mailing list
> CRIU at openvz.org
> https://lists.openvz.org/mailman/listinfo/criu
> 

-- 
Sincerely yours,
Mike.



More information about the CRIU mailing list