[CRIU] Process Migration using Sockets v2 - Patch 2/2
Pavel Emelyanov
xemul at parallels.com
Mon Oct 12 02:38:43 PDT 2015
On 10/09/2015 05:46 PM, Rodrigo Bruno wrote:
> Hi,
>
> here goes the second part.
Thanks, this time it's more clear. Please, find my comments inline.
> This patch adds:
>
> 1. image-cache.c - the component that is on the same node as CRIU restore. It holds
> image files in memory waiting for CRIU restore to ask them
>
> 2. image-proxy.c - the component that is on the same node as CRIU dump. It holds
> image files in memory (because a future dump/predump might need them) and also
> forwards them to the image-cache.
>
> 3. image-remote-pvt.c - implements almost all the image receiving and sending
> engine.
What does the "pvt" stand for?
> 4. include/image-remote-pvt.h - exports function declarations used by both image-cache
> image-proxy and image-remote.
>
>
> Signed-off-by: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
>>From e690070808e6f1b459f033e162c910bc2d02859a Mon Sep 17 00:00:00 2001
> From: rodrigo-bruno <rbruno at gsd.inesc-id.pt>
> Date: Fri, 9 Oct 2015 15:37:17 +0100
> Subject: [PATCH] Process migration using sockets (2/2)
>
> ---
> image-cache.c | 54 +++++
> image-proxy.c | 74 ++++++
> image-remote-pvt.c | 566 +++++++++++++++++++++++++++++++++++++++++++++
> include/image-remote-pvt.h | 52 +++++
> 4 files changed, 746 insertions(+)
> create mode 100644 image-cache.c
> create mode 100644 image-proxy.c
> create mode 100644 image-remote-pvt.c
> create mode 100644 include/image-remote-pvt.h
>
> diff --git a/image-cache.c b/image-cache.c
> new file mode 100644
> index 0000000..e2030f4
> --- /dev/null
> +++ b/image-cache.c
> @@ -0,0 +1,54 @@
> +#include <unistd.h>
> +
> +#include "image-remote.h"
> +#include "image-remote-pvt.h"
> +#include "criu-log.h"
> +#include <pthread.h>
> +
> +static void* cache_remote_image(void* ptr)
> +{
> + remote_image* rimg = (remote_image*) ptr;
> +
> + if (!strncmp(rimg->path, DUMP_FINISH, sizeof (DUMP_FINISH)))
> + {
> + close(rimg->src_fd);
> + return NULL;
> + }
> +
> + prepare_put_rimg();
> + recv_remote_image(rimg->src_fd, rimg->path, &rimg->buf_head, true);
> + finalize_put_rimg(rimg);
> +
> + return NULL;
> +}
> +
> +int image_cache(unsigned short cache_put_port)
> +{
> + pthread_t get_thr, put_thr;
> + int put_fd, get_fd;
> +
> + pr_info("Put Port %d, Get Path %s\n", cache_put_port, CACHE_GET);
> +
> + put_fd = setup_TCP_server_socket(cache_put_port);
Proxy and cache should be able to work over given connections. Look at the
opts.ps_socket field for details how this works in case of page-server.
> + get_fd = setup_UNIX_server_socket(CACHE_GET);
> +
> + if (init_daemon(cache_remote_image))
> + return -1;
> +
> + if (pthread_create(
> + &put_thr, NULL, accept_put_image_connections, (void*) &put_fd)) {
> + pr_perror("Unable to create put thread");
> + return -1;
> + }
> + if (pthread_create(
> + &get_thr, NULL, accept_get_image_connections, (void*) &get_fd)) {
> + pr_perror("Unable to create get thread");
> + return -1;
> + }
Can you describe the logic behind the multithreading and sems/mutexes you use?
> +
> + join_workers();
> +
> + pthread_join(put_thr, NULL);
> + pthread_join(get_thr, NULL);
> + return 0;
> +}
> diff --git a/image-proxy.c b/image-proxy.c
> new file mode 100644
> index 0000000..24b8935
> --- /dev/null
> +++ b/image-proxy.c
> @@ -0,0 +1,74 @@
> +#include <unistd.h>
> +
> +#include "image-remote.h"
> +#include "image-remote-pvt.h"
> +#include "criu-log.h"
> +#include <pthread.h>
> +
> +static char* dst_host;
> +static unsigned short dst_port;
> +
> +static void* proxy_remote_image(void* ptr)
> +{
> + remote_image* rimg = (remote_image*) ptr;
> + rimg->dst_fd = setup_TCP_client_socket(dst_host, dst_port);
> + if (rimg->dst_fd < 0) {
> + pr_perror("Unable to open recover image socket");
> + return NULL;
> + }
> +
> + if (write_header(rimg->dst_fd, rimg->namespace, rimg->path) < 0) {
> + pr_perror("Error writing header for %s:%s",
> + rimg->path, rimg->namespace);
> + return NULL;
> + }
> +
> + prepare_put_rimg();
> +
> + if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
For control messages I would suggest integer field, rather than magic names
in paths :)
> + close(rimg->dst_fd);
> + finalize_put_rimg(rimg);
> + return NULL;
> + }
> + if (recv_remote_image(rimg->src_fd, rimg->path, &(rimg->buf_head), false) < 0) {
> + return NULL;
> + }
> + finalize_put_rimg(rimg);
> + send_remote_image(rimg->dst_fd, rimg->path, &(rimg->buf_head), true);
> + return NULL;
> +}
> +
> +int image_proxy(char* fwd_host, unsigned short fwd_port)
> +{
> + pthread_t get_thr, put_thr;
> + int put_fd, get_fd;
> +
> + dst_host = fwd_host;
> + dst_port = fwd_port;
> +
> + pr_info("Proxy Get Path %s, Put Path %s, Destination Host %s:%hu\n",
> + PROXY_GET, PROXY_PUT, fwd_host, fwd_port);
> +
> + put_fd = setup_UNIX_server_socket(PROXY_PUT);
> + get_fd = setup_UNIX_server_socket(PROXY_GET);
> +
> + if(init_daemon(proxy_remote_image))
> + return -1;
> +
> + if (pthread_create(
> + &put_thr, NULL, accept_put_image_connections, (void*) &put_fd)) {
> + pr_perror("Unable to create put thread");
> + return -1;
> + }
> + if (pthread_create(
> + &get_thr, NULL, accept_get_image_connections, (void*) &get_fd)) {
> + pr_perror("Unable to create get thread");
> + return -1;
> + }
> +
> + join_workers();
> +
> + pthread_join(put_thr, NULL);
> + pthread_join(get_thr, NULL);
> + return 0;
> +}
> diff --git a/image-remote-pvt.c b/image-remote-pvt.c
> new file mode 100644
> index 0000000..285bb91
> --- /dev/null
> +++ b/image-remote-pvt.c
> @@ -0,0 +1,566 @@
> +#include <unistd.h>
> +#include <stdlib.h>
> +
> +#include <semaphore.h>
> +#include <sys/socket.h>
> +#include <netinet/in.h>
> +#include <netdb.h>
> +#include "sys/un.h"
> +#include <pthread.h>
> +
> +#include "image-remote-pvt.h"
> +#include "criu-log.h"
> +
> +#include "protobuf.h"
> +#include "protobuf/remote-image.pb-c.h"
> +#include "image.h"
> +
> +typedef struct wthread {
> + pthread_t tid;
> + struct list_head l;
> +} worker_thread;
> +
> +static LIST_HEAD(rimg_head);
> +static pthread_mutex_t rimg_lock;
> +static sem_t rimg_semph;
> +
> +static LIST_HEAD(workers_head);
> +static pthread_mutex_t workers_lock;
> +static sem_t workers_semph;
> +
> +static int finished = 0;
> +static int putting = 0;
> +
> +static void* (*get_func)(void*);
> +static void* (*put_func)(void*);
What does get and put stand for here?
> +
> +static remote_image* get_rimg_by_name(const char* namespace, const char* path)
> +{
> + remote_image* rimg = NULL;
> + pthread_mutex_lock(&rimg_lock);
> + list_for_each_entry(rimg, &rimg_head, l) {
> + if( !strncmp(rimg->path, path, PATHLEN) &&
> + !strncmp(rimg->namespace, namespace, PATHLEN)) {
> + pthread_mutex_unlock(&rimg_lock);
> + return rimg;
> + }
> + }
> + pthread_mutex_unlock(&rimg_lock);
> + return NULL;
> +}
> +
> +static uint64_t sizeof_remote_buffer(struct list_head* rbuff_head)
> +{
> + uint64_t res = 0;
> + remote_buffer* aux = NULL;
> + list_for_each_entry(aux, rbuff_head, l) {
> + res += aux->nbytes;
> + }
Indentation.
> + return res;
> +}
> +
> +int init_sync_structures()
> +{
> + if (pthread_mutex_init(&rimg_lock, NULL) != 0) {
> + pr_perror("Remote image connection mutex init failed");
> + return -1;
> + }
> +
> + if (sem_init(&rimg_semph, 0, 0) != 0) {
> + pr_perror("Remote image connection semaphore init failed");
> + return -1;
> + }
> +
> + if (pthread_mutex_init(&workers_lock, NULL) != 0) {
> + pr_perror("Workers mutex init failed");
> + return -1;
> + }
> +
> + if (sem_init(&workers_semph, 0, 0) != 0) {
> + pr_perror("Workers semaphore init failed");
> + return -1;
> + }
> + return 0;
> +}
> +
> +static remote_image* wait_for_image(int cli_fd, char* namespace, char* path)
> +{
This routine waits for image to appear on proxy-cache connection, doesn't it? It looks
like the protocol between cache and proxy is more complex than just proxy sends header
and data over the wire, can you describe it in details?
> + remote_image *result;
> +
> + while (1) {
> + result = get_rimg_by_name(namespace, path);
> +
> + if (result != NULL) {
> + if (write_header(cli_fd, namespace, path) < 0) {
> + pr_perror("Error writing header for %s:%s",
> + path, namespace);
> + close(cli_fd);
> + return NULL;
> + }
> + return result;
> + }
> + /* The file does not exist and we do not expect new files */
> + if (finished && !putting) {
> + if (write_header(cli_fd, NULL_NAMESPACE, DUMP_FINISH) < 0) {
> + pr_perror("Error writing header for %s:%s",
> + DUMP_FINISH, NULL_NAMESPACE);
> + }
> + close(cli_fd);
> + return NULL;
> + }
> + /* The file does not exist but the request is for a parent file.
> + A parent file may not exist for the first process. */
> + if (!putting && !strncmp(path, PARENT_IMG, PATHLEN)) {
> + if (write_header(cli_fd, namespace, path) < 0) {
> + pr_perror("Error writing header for %s:%s",
> + path, namespace);
> + }
> + close(cli_fd);
> + return NULL;
> + }
> + sem_wait(&rimg_semph);
> + }
> +}
> +
> +void* get_remote_image(void* fd)
Should be static.
> +{
> + int cli_fd = (long) fd;
> + remote_image* rimg = NULL;
> + char path_buf[PATHLEN];
> + char namespace_buf[PATHLEN];
> +
> + if(read_header(cli_fd, namespace_buf, path_buf) < 0) {
> + pr_perror("Error reading header");
> + return NULL;
> + }
> +
> + pr_info("Received GET for %s:%s.\n", path_buf, namespace_buf);
> +
> + rimg = wait_for_image(cli_fd, namespace_buf, path_buf);
> + if (!rimg)
> + return NULL;
> +
> + rimg->dst_fd = cli_fd;
> + send_remote_image(rimg->dst_fd, rimg->path, &rimg->buf_head, false);
> + return NULL;
> +}
> +
> +void prepare_put_rimg()
> +{
> + pthread_mutex_lock(&rimg_lock);
> + putting++;
> + pthread_mutex_unlock(&rimg_lock);
> +}
> +
> +void finalize_put_rimg(remote_image* rimg)
> +{
> + pthread_mutex_lock(&rimg_lock);
> + list_add_tail(&(rimg->l), &rimg_head);
> + putting--;
> + pthread_mutex_unlock(&rimg_lock);
> + sem_post(&rimg_semph);
> +}
> +
> +int init_daemon(void *(*put_function) (void *))
> +{
> + get_func = get_remote_image;
> + put_func = put_function;
> + return init_sync_structures();
> +}
> +
> +int setup_TCP_server_socket(int port)
> +{
> + struct sockaddr_in serv_addr;
> + int sockopt = 1;
> +
> + int sockfd = socket(AF_INET, SOCK_STREAM, 0);
> + if (sockfd < 0) {
> + pr_perror("Unable to open image socket");
> + return -1;
> + }
> +
> + bzero((char *) &serv_addr, sizeof (serv_addr));
> + serv_addr.sin_family = AF_INET;
> + serv_addr.sin_addr.s_addr = INADDR_ANY;
> + serv_addr.sin_port = htons(port);
> +
> + if (setsockopt(
> + sockfd, SOL_SOCKET, SO_REUSEADDR, &sockopt, sizeof (sockopt)) == -1) {
> + pr_perror("Unable to set SO_REUSEADDR");
> + return -1;
> + }
> +
> + if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) < 0) {
> + pr_perror("Unable to bind image socket");
> + return -1;
> + }
> +
> + if (listen(sockfd, DEFAULT_LISTEN)) {
> + pr_perror("Unable to listen image socket");
> + return -1;
> + }
> +
> + return sockfd;
> +}
> +
> +int setup_TCP_client_socket(char* hostname, int port)
> +{
> + int sockfd;
> + struct sockaddr_in serv_addr;
> + struct hostent *server;
> +
> + sockfd = socket(AF_INET, SOCK_STREAM, 0);
> + if (sockfd < 0) {
> + pr_perror("Unable to open remote image socket");
> + return -1;
> + }
> +
> + server = gethostbyname(hostname);
> + if (server == NULL) {
> + pr_perror("Unable to get host by name (%s)", hostname);
> + return -1;
> + }
> +
> + bzero((char *) &serv_addr, sizeof (serv_addr));
> + serv_addr.sin_family = AF_INET;
> + bcopy((char *) server->h_addr,
> + (char *) &serv_addr.sin_addr.s_addr,
> + server->h_length);
> + serv_addr.sin_port = htons(port);
> +
> + if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
> + pr_perror("Unable to connect to remote restore host %s", hostname);
> + return -1;
> + }
> +
> + return sockfd;
> +}
> +
> +int setup_UNIX_server_socket(char* path)
> +{
> + struct sockaddr_un addr;
> + int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> + if(sockfd < 0) {
> + pr_perror("Unable to open image socket");
> + return -1;
> + }
> +
> + memset(&addr, 0, sizeof(addr));
> + addr.sun_family = AF_UNIX;
> + strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> +
> + unlink(path);
> +
> + if (bind(sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
> + pr_perror("Unable to bind image socket");
> + return -1;
> + }
> +
> + if (listen(sockfd, 50) == -1) {
> + pr_perror("Unable to listen image socket");
> + return -1;
> + }
> +
> + return sockfd;
> +}
> +
> +int setup_UNIX_client_socket(char* path)
> +{
> + struct sockaddr_un addr;
> + int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
> + if (sockfd < 0) {
> + pr_perror("Unable to open local image socket");
> + return -1;
> + }
> +
> + memset(&addr, 0, sizeof(addr));
> + addr.sun_family = AF_UNIX;
> + strncpy(addr.sun_path, path, sizeof(addr.sun_path)-1);
> +
> + if (connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
> + pr_perror("Unable to connect to local socket: %s", path);
> + return -1;
> + }
> +
> + return sockfd;
> +}
> +
> +int pb_write_obj(int fd, void* obj, int type)
These pb_* helpers are not used outside the .c file, so should be static
and removed from headers.
> +{
> + struct cr_img img;
> + img._x.fd = fd;
> + bfd_setraw(&img._x);
> + return pb_write_one(&img, obj, type);
> +}
> +
> +int pb_read_obj(int fd, void** pobj, int type)
> +{
> + struct cr_img img;
> + img._x.fd = fd;
> + bfd_setraw(&img._x);
> + return do_pb_read_one(&img, pobj, type, true);
> +}
> +
> +int write_header(int fd, char* namespace, char* path)
> +{
> + RemoteImageEntry ri = REMOTE_IMAGE_ENTRY__INIT;
> + ri.name = path;
> + ri.namespace_ = namespace;
> + return pb_write_obj(fd, &ri, PB_REMOTE_IMAGE);
> +}
> +
> +int read_header(int fd, char* namespace, char* path)
> +{
> + RemoteImageEntry* ri;
> + int ret = pb_read_obj(fd, (void**)&ri, PB_REMOTE_IMAGE);
> + if (ret) {
> + strncpy(namespace, ri->namespace_, PATHLEN);
> + strncpy(path, ri->name, PATHLEN);
> + }
> + free(ri);
> + return ret;
> +}
> +
> +static void add_worker(pthread_t tid)
> +{
> + worker_thread* wthread = malloc(sizeof(worker_thread));
> + if(!wthread) {
> + pr_perror("Unable to allocate worker thread structure");
> + }
> + wthread->tid = tid;
> + pthread_mutex_lock(&workers_lock);
> + list_add_tail(&(wthread->l), &workers_head);
> + pthread_mutex_unlock(&workers_lock);
> + sem_post(&workers_semph);
> +}
> +
> +void join_workers()
> +{
> + worker_thread* wthread = NULL;
> + while (1) {
> + if (list_empty(&workers_head)) {
> + sem_wait(&workers_semph);
> + continue;
> + }
> + wthread = list_entry(workers_head.next, worker_thread, l);
> + if (pthread_join(wthread->tid, NULL)) {
> + pr_perror("Could not join thread %lu", (unsigned long) wthread->tid);
> + }
> + else {
> + list_del(&(wthread->l));
> + free(wthread);
> + }
> +
> + }
> +}
> +
> +void* accept_get_image_connections(void* port)
> +{
> + socklen_t clilen;
> + long cli_fd;
> + pthread_t tid;
> + int get_fd = *((int*) port);
> + struct sockaddr_in cli_addr;
> + clilen = sizeof (cli_addr);
> +
> + while (1) {
> + cli_fd = accept(get_fd, (struct sockaddr *) &cli_addr, &clilen);
> + if (cli_fd < 0) {
> + pr_perror("Unable to accept get image connection");
> + return NULL;
> + }
> +
> + if (pthread_create(
> + &tid, NULL, get_func, (void*) cli_fd)) {
> + pr_perror("Unable to create put thread");
> + return NULL;
> + }
> +
> + add_worker(tid);
> + }
> +}
> +
> +void* accept_put_image_connections(void* port)
> +{
> + socklen_t clilen;
> + int cli_fd;
> + pthread_t tid;
> + int put_fd = *((int*) port);
> + struct sockaddr_in cli_addr;
> + clilen = sizeof(cli_addr);
> + char path_buf[PATHLEN];
> + char namespace_buf[PATHLEN];
> +
> + while (1) {
> +
> + cli_fd = accept(put_fd, (struct sockaddr *) &cli_addr, &clilen);
> + if (cli_fd < 0) {
> + pr_perror("Unable to accept put image connection");
> + return NULL;
> + }
> +
> + if (read_header(cli_fd, namespace_buf, path_buf) < 0) {
> + pr_perror("Error reading header");
> + continue;
> + }
> +
> + remote_image* rimg = get_rimg_by_name(namespace_buf, path_buf);
> +
> + pr_info("Reveiced PUT request for %s:%s\n", path_buf, namespace_buf);
> +
> + if (rimg == NULL) {
> + rimg = malloc(sizeof (remote_image));
> + if (rimg == NULL) {
> + pr_perror("Unable to allocate remote_image structures");
> + return NULL;
> + }
> +
> + remote_buffer* buf = malloc(sizeof (remote_buffer));
> + if (buf == NULL) {
> + pr_perror("Unable to allocate remote_buffer structures");
> + return NULL;
> + }
> +
> + strncpy(rimg->path, path_buf, PATHLEN);
> + strncpy(rimg->namespace, namespace_buf, PATHLEN);
> + buf->nbytes = 0;
> + INIT_LIST_HEAD(&(rimg->buf_head));
> + list_add_tail(&(buf->l), &(rimg->buf_head));
> + }
> + /* NOTE: we implement a PUT by clearing the previous file. */
> + else {
> + pr_info("Clearing previous images for %s:%s\n",
> + path_buf, namespace_buf);
> + pthread_mutex_lock(&rimg_lock);
> + list_del(&(rimg->l));
> + pthread_mutex_unlock(&rimg_lock);
> + while (!list_is_singular(&(rimg->buf_head))) {
> + list_del(rimg->buf_head.prev);
> + }
> + list_entry(rimg->buf_head.next, remote_buffer, l)->nbytes = 0;
> + }
> + rimg->src_fd = cli_fd;
> + rimg->dst_fd = -1;
> +
> + if (pthread_create(
> + &tid, NULL, put_func, (void*) rimg)) {
> + pr_perror("Unable to create put thread");
> + return NULL;
> + }
> +
> + pr_info("Serving PUT request for %s:%s (tid=%lu)\n",
> + rimg->path, rimg->namespace, (unsigned long) tid);
> +
> + add_worker(tid);
> +
> + if (!strncmp(path_buf, DUMP_FINISH, sizeof (DUMP_FINISH))) {
> + finished = 1;
> + pr_info("Received DUMP FINISH\n");
> + sem_post(&rimg_semph);
> + }
> + }
> +}
> +
> +int recv_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check)
This routine makes sense only on the cache side, am I right? If yes, it would be nice
to split the code into different .c files so that when reading the code it is clear
which one of the cache-proxy pair we're at.
> +{
> + remote_buffer* curr_buf = list_entry(rbuff_head->next, remote_buffer, l);
> + int n, nblocks = 0;
> + uint64_t image_bytes = 0;
> + uint64_t recv_bytes = 0;
> +
> + if (image_check && (read(fd, &image_bytes, sizeof(uint64_t)) != sizeof(uint64_t))) {
> + pr_perror("Unable to read size of the image %s", path);
> + close(fd);
> + return -1;
> + }
> +
> + while(1) {
> + n = read(fd,
> + curr_buf->buffer + curr_buf->nbytes,
> + BUF_SIZE - curr_buf->nbytes);
> + if (n == 0 && image_check && image_bytes != recv_bytes) {
> + pr_info("Failed receiving %s (received %lu instead of %lu)\n",
> + path, recv_bytes, image_bytes);
> + close(fd);
> + return -1;
> + }
> + else if (n == 0) {
> + pr_info("Finished receiving %s (%d full blocks, %d bytes on last block)\n",
> + path, nblocks, curr_buf->nbytes);
> + close(fd);
> + return recv_bytes;
> + }
> + else if (n > 0) {
> + curr_buf->nbytes += n;
> + recv_bytes = nblocks*BUF_SIZE + curr_buf->nbytes;
> + if(curr_buf->nbytes == BUF_SIZE) {
> + remote_buffer* buf = malloc(sizeof(remote_buffer));
> + if (buf == NULL) {
> + pr_perror("Unable to allocate remote_buffer structures");
> + close(fd);
> + return -1;
> + }
> + buf->nbytes = 0;
> + list_add_tail(&(buf->l), rbuff_head);
> + curr_buf = buf;
> + nblocks++;
> + }
> + }
> + else {
> + pr_perror("Read on %s socket failed", path);
> + close(fd);
> + return -1;
> + }
> + }
> +}
> +
> +int send_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check)
And this one is opposite -- for proxy.
> +{
> + remote_buffer* curr_buf = list_entry(rbuff_head->next, remote_buffer, l);
> + int n, curr_offset, nblocks;
> +
> + nblocks = 0;
> + curr_offset = 0;
> +
> + if (image_check) {
> + uint64_t image_bytes = sizeof_remote_buffer(rbuff_head);
> + n = send(fd, &image_bytes, sizeof(uint64_t), MSG_NOSIGNAL);
> + if (n < sizeof(uint64_t)) {
> + pr_perror("Unable to send size of the image %s", path);
> + close(fd);
> + return -1;
> + }
> + }
> +
> + while(1) {
> + n = send(
> + fd,
> + curr_buf->buffer + curr_offset,
> + MIN(BUF_SIZE, curr_buf->nbytes) - curr_offset,
> + MSG_NOSIGNAL);
> + if (n > -1) {
> + curr_offset += n;
> + if (curr_offset == BUF_SIZE) {
> + curr_buf =
> + list_entry(curr_buf->l.next, remote_buffer, l);
> + nblocks++;
> + curr_offset = 0;
> + }
> + else if (curr_offset == curr_buf->nbytes) {
> + pr_info("Finished forwarding %s (%d full blocks, %d bytes on last block)\n",
> + path, nblocks, curr_offset);
> + close(fd);
> + return nblocks*BUF_SIZE + curr_buf->nbytes;
> + }
> + }
> + else if (errno == EPIPE || errno == ECONNRESET) {
> + pr_warn("Connection for %s was closed early than expected\n",
> + path);
> + return 0;
> + }
> + else {
> + pr_perror("Write on %s socket failed", path);
> + return -1;
> + }
> + }
> diff --git a/include/image-remote-pvt.h b/include/image-remote-pvt.h
> new file mode 100644
> index 0000000..0a20e6d
> --- /dev/null
> +++ b/include/image-remote-pvt.h
> @@ -0,0 +1,52 @@
> +#ifndef IMAGE_REMOTE_PVT_H
> +#define IMAGE_REMOTE_PVT_H
> +
> +#include <stdbool.h>
> +#include "list.h"
> +#include "image-remote.h"
> +
> +#define DEFAULT_LISTEN 50
> +#define PAGESIZE 4096
> +#define BUF_SIZE PAGESIZE
> +
> +typedef struct rbuf {
> + char buffer[BUF_SIZE];
> + int nbytes; /* How many bytes are in the buffer. */
> + struct list_head l;
> +} remote_buffer;
Minor, but still. I'd prefer plain structures as datatypes for objects that
are used internally in some subsystem.
> +
> +typedef struct rimg {
> + char path[PATHLEN];
> + char namespace[PATHLEN];
> + int src_fd;
> + int dst_fd;
> + struct list_head l;
> + struct list_head buf_head;
> +
> +} remote_image;
> +
> +int init_daemon(void *(*put_function) (void *));
> +
> +void join_workers();
> +
> +void prepare_put_rimg();
> +void finalize_put_rimg(remote_image* rimg);
> +
> +void* accept_get_image_connections(void* port);
> +void* accept_put_image_connections(void* port);
> +
> +int send_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check);
> +int recv_remote_image(int fd, char* path, struct list_head* rbuff_head, bool image_check);
> +
> +int pb_write_obj(int fd, void* obj, int type);
> +int pb_read_obj(int fd, void** obj, int type);
> +
> +int write_header(int fd, char* namespace, char* path);
> +int read_header(int fd, char* namespace, char* path);
> +
> +int setup_TCP_server_socket(int port);
> +int setup_TCP_client_socket(char* hostname, int port);
> +int setup_UNIX_client_socket(char* path);
> +int setup_UNIX_server_socket(char* path);
> +#endif /* IMAGE_REMOTE_INTERNAL_H */
> +
>
-- Pavel
More information about the CRIU
mailing list