[CRIU] [PATCH 3/5] Unblocking implementation of img cache and proxy seems to be working.
rodrigo-bruno
rbruno at gsd.inesc-id.pt
Mon May 14 03:29:49 MSK 2018
From: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
---
criu/img-cache.c | 137 +-----
criu/img-proxy.c | 65 +--
criu/img-remote.c | 961 +++++++++++++++++++++++---------------
criu/include/img-remote.h | 53 +--
4 files changed, 629 insertions(+), 587 deletions(-)
diff --git a/criu/img-cache.c b/criu/img-cache.c
index 7b828b9b..c941f14e 100644
--- a/criu/img-cache.c
+++ b/criu/img-cache.c
@@ -8,105 +8,25 @@
#include <fcntl.h>
#include "cr_options.h"
-static struct rimage *wait_for_image(struct wthread *wt)
+int accept_proxy_to_cache(int sockfd)
{
- struct rimage *result;
+ struct sockaddr_in cli_addr;
+ socklen_t clilen = sizeof(cli_addr);
+ int proxy_fd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
- if (!strncmp(wt->path, RESTORE_FINISH, sizeof(RESTORE_FINISH))) {
- finished = true;
- shutdown(local_req_fd, SHUT_RD);
- return NULL;
- }
-
- result = get_rimg_by_name(wt->snapshot_id, wt->path);
- if (result != NULL && result->size > 0)
- return result;
-
- /* The file does not exist and we do not expect new files */
- if (finished && !is_receiving())
- return NULL;
-
- /* NOTE: at this point, when the thread wakes up, either the image is
- * already in memory or it will never come (the dump is finished).
- */
- sem_wait(&(wt->wakeup_sem));
- result = get_rimg_by_name(wt->snapshot_id, wt->path);
- if (result != NULL && result->size > 0)
- return result;
- else
- return NULL;
-}
-
-/* The image cache creates a thread that calls this function. It waits for remote
- * images from the image-cache.
- */
-void *accept_remote_image_connections(void *port)
-{
- int fd = *((int *) port);
- struct sockaddr_in cli_addr;
- socklen_t clilen = sizeof(cli_addr);
- char snapshot_id_buf[PATHLEN], path_buf[PATHLEN];
- uint64_t size;
- int64_t ret;
- int flags, proxy_fd;
- struct rimage *rimg;
-
- proxy_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
- if (proxy_fd < 0) {
- pr_perror("Unable to accept remote image connection from image proxy");
- return NULL;
- }
- while (1) {
- ret = read_remote_header(proxy_fd, snapshot_id_buf, path_buf, &flags, &size);
- if (ret < 0) {
- pr_perror("Unable to receive remote header from image proxy");
- return NULL;
- }
- /* This means that the no more images are coming. */
- else if (!ret) {
- pr_info("Image Proxy connection closed.\n");
- finished = true;
- unlock_workers();
- return NULL;
- }
-
- pr_info("Received %s request for %s:%s\n",
- flags == O_RDONLY ? "read" :
- flags == O_APPEND ? "append" : "write",
- path_buf, snapshot_id_buf);
+ if (proxy_fd < 0) {
+ pr_perror("Unable to accept remote image connection from image proxy");
+ return -1;
+ }
- rimg = prepare_remote_image(path_buf, snapshot_id_buf, flags);
-
- prepare_recv_rimg();
- if (!size)
- ret = 0;
- else
- ret = recv_image(proxy_fd, rimg, size, flags, false);
- if (ret < 0) {
- pr_perror("Unable to receive %s:%s from image proxy",
- rimg->path, rimg->snapshot_id);
- finalize_recv_rimg(NULL);
- return NULL;
- } else if (ret != size) {
- pr_perror("Unable to receive %s:%s from image proxy (received %ld bytes, expected %lu bytes)",
- rimg->path, rimg->snapshot_id, (long)ret, (unsigned long)size);
- finalize_recv_rimg(NULL);
- return NULL;
- }
- finalize_recv_rimg(rimg);
-
- pr_info("Finished receiving %s:%s (received %ld bytes)\n",
- rimg->path, rimg->snapshot_id, (long)ret);
- }
+ return proxy_fd;
}
int image_cache(bool background, char *local_cache_path, unsigned short cache_write_port)
{
- pthread_t local_req_thr, remote_req_thr;
-
pr_info("Proxy to Cache Port %d, CRIU to Cache Path %s\n",
cache_write_port, local_cache_path);
-
+ restoring = true;
if (opts.ps_socket != -1) {
proxy_to_cache_fd = opts.ps_socket;
@@ -117,40 +37,29 @@ int image_cache(bool background, char *local_cache_path, unsigned short cache_wr
pr_perror("Unable to open proxy to cache TCP socket");
return -1;
}
+ // Wait to accept connection from proxy.
+ proxy_to_cache_fd = accept_proxy_to_cache(proxy_to_cache_fd);
+ if (proxy_to_cache_fd < 0)
+ return -1; // TODO - should close other sockets.
}
+ pr_info("Cache is connected to Proxy through fd %d\n", proxy_to_cache_fd);
+
local_req_fd = setup_UNIX_server_socket(local_cache_path);
if (local_req_fd < 0) {
pr_perror("Unable to open cache to proxy UNIX socket");
- return -1;
- }
-
- socket_set_non_blocking(local_req_fd);
+ return -1; // TODO - should close other sockets.
- if (init_daemon(background, wait_for_image)) {
- pr_perror("Unable to initialize daemon");
- return -1;
}
- if (pthread_create(
- &remote_req_thr,
- NULL, accept_remote_image_connections,
- (void *) &proxy_to_cache_fd)) {
- pr_perror("Unable to create remote requests thread");
- return -1;
- }
- if (pthread_create(
- &local_req_thr,
- NULL,
- accept_local_image_connections,
- (void *) &local_req_fd)) {
- pr_perror("Unable to create local requests thread");
- return -1;
+ if (background) {
+ if (daemon(1, 0) == -1) {
+ pr_perror("Can't run service server in the background");
+ return -1;
+ }
}
- pthread_join(remote_req_thr, NULL);
- pthread_join(local_req_thr, NULL);
- join_workers();
+ accept_image_connections();
pr_info("Finished image cache.");
return 0;
}
diff --git a/criu/img-proxy.c b/criu/img-proxy.c
index b63d69a0..9551a7dc 100644
--- a/criu/img-proxy.c
+++ b/criu/img-proxy.c
@@ -8,51 +8,11 @@
#include <sys/socket.h>
#include "cr_options.h"
-static struct rimage *wait_for_image(struct wthread *wt)
-{
- return get_rimg_by_name(wt->snapshot_id, wt->path);
-}
-
-int64_t forward_image(struct rimage *rimg)
-{
- int64_t ret;
- int fd = proxy_to_cache_fd;
-
- pthread_mutex_lock(&(rimg->in_use));
- pr_info("Forwarding %s:%s (%lu bytes)\n",
- rimg->path, rimg->snapshot_id, (unsigned long)rimg->size);
- if (write_remote_header(
- fd, rimg->snapshot_id, rimg->path, O_APPEND, rimg->size) < 0) {
- pr_perror("Error writing header for %s:%s",
- rimg->path, rimg->snapshot_id);
- pthread_mutex_unlock(&(rimg->in_use));
- return -1;
- }
-
- ret = send_image(fd, rimg, O_APPEND, false);
- if (ret < 0) {
- pr_perror("Unable to send %s:%s to image cache",
- rimg->path, rimg->snapshot_id);
- pthread_mutex_unlock(&(rimg->in_use));
- return -1;
- } else if (ret != rimg->size) {
- pr_perror("Unable to send %s:%s to image proxy (sent %ld bytes, expected %lu bytes",
- rimg->path, rimg->snapshot_id, (long)ret, (unsigned long)rimg->size);
- pthread_mutex_unlock(&(rimg->in_use));
- return -1;
- }
- pr_info("Finished forwarding %s:%s (sent %lu bytes)\n",
- rimg->path, rimg->snapshot_id, (unsigned long)rimg->size);
- pthread_mutex_unlock(&(rimg->in_use));
- return ret;
-}
-
int image_proxy(bool background, char *local_proxy_path, char *fwd_host, unsigned short fwd_port)
{
- pthread_t local_req_thr;
-
pr_info("CRIU to Proxy Path: %s, Cache Address %s:%hu\n",
local_proxy_path, fwd_host, fwd_port);
+ restoring = false;
local_req_fd = setup_UNIX_server_socket(local_proxy_path);
if (local_req_fd < 0) {
@@ -60,8 +20,6 @@ int image_proxy(bool background, char *local_proxy_path, char *fwd_host, unsigne
return -1;
}
- socket_set_non_blocking(local_req_fd);
-
if (opts.ps_socket != -1) {
proxy_to_cache_fd = opts.ps_socket;
pr_info("Re-using ps socket %d\n", proxy_to_cache_fd);
@@ -69,24 +27,21 @@ int image_proxy(bool background, char *local_proxy_path, char *fwd_host, unsigne
proxy_to_cache_fd = setup_TCP_client_socket(fwd_host, fwd_port);
if (proxy_to_cache_fd < 0) {
pr_perror("Unable to open proxy to cache TCP socket");
- return -1;
+ return -1; // TODO - should close other sockets.
}
}
- if (init_daemon(background, wait_for_image))
- return -1;
+ pr_info("Proxy is connected to Cache through fd %d\n", proxy_to_cache_fd);
- if (pthread_create(
- &local_req_thr,
- NULL,
- accept_local_image_connections,
- (void *) &local_req_fd)) {
- pr_perror("Unable to create local requests thread");
- return -1;
+ if (background) {
+ if (daemon(1, 0) == -1) {
+ pr_perror("Can't run service server in the background");
+ return -1;
+ }
}
- pthread_join(local_req_thr, NULL);
- join_workers();
+ // TODO - local_req_fd and proxy_to_cache_fd send as args.
+ accept_image_connections();
pr_info("Finished image proxy.");
return 0;
}
diff --git a/criu/img-remote.c b/criu/img-remote.c
index 4c566450..ec69bc02 100644
--- a/criu/img-remote.c
+++ b/criu/img-remote.c
@@ -27,29 +27,35 @@
#define PB_LOCAL_IMAGE_SIZE PATHLEN
#define EPOLL_MAX_EVENTS 50
-static char *snapshot_id;
-bool restoring = true;
-
+// List of images already in memory.
LIST_HEAD(rimg_head);
-pthread_mutex_t rimg_lock = PTHREAD_MUTEX_INITIALIZER;
-pthread_mutex_t proxy_to_cache_lock = PTHREAD_MUTEX_INITIALIZER;
+// List of local operations currently in-progess.
+LIST_HEAD(rop_inprogress);
-LIST_HEAD(workers_head);
-pthread_mutex_t workers_lock = PTHREAD_MUTEX_INITIALIZER;
-sem_t workers_semph;
+// List of local operations pending (reads on the restore side for images that
+// still haven't arrived).
-struct rimage * (*wait_for_image) (struct wthread *wt);
+LIST_HEAD(rop_pending);
+// List of images waiting to be forwarded. The head of the list is currently
+// being forwarded.
+LIST_HEAD(rop_forwarding);
+
+// List of snapshots (useful when doing incremental restores/dumps
+LIST_HEAD(snapshot_head);
-bool finished = false;
-int writing = 0;
-int forwarding = 0;
+static char *snapshot_id;
+bool restoring = true; // TODO - check where this is used!
+// TODO - split this into two vars, recv_from_proxy, send_to_cache
+bool forwarding = false; // TODO - true if proxy_to_cache_fd is being used.
+bool finished_local = false;
+bool finished_remote = false;
int proxy_to_cache_fd;
int local_req_fd;
+int epoll_fd;
+struct epoll_event *events;
-LIST_HEAD(snapshot_head);
-
/* A snapshot is a dump or pre-dump operation. Each snapshot is identified by an
* ID which corresponds to the working directory specefied by the user.
*/
@@ -79,125 +85,27 @@ struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path)
{
struct rimage *rimg = NULL;
- pthread_mutex_lock(&rimg_lock);
list_for_each_entry(rimg, &rimg_head, l) {
if (!strncmp(rimg->path, path, PATHLEN) &&
!strncmp(rimg->snapshot_id, snapshot_id, PATHLEN)) {
- pthread_mutex_unlock(&rimg_lock);
return rimg;
}
}
- pthread_mutex_unlock(&rimg_lock);
- return NULL;
-}
-
-static struct wthread *get_wt_by_name(const char *snapshot_id, const char *path)
-{
- struct wthread *wt = NULL;
-
- pthread_mutex_lock(&workers_lock);
- list_for_each_entry(wt, &workers_head, l) {
- if (!strncmp(wt->path, path, PATHLEN) &&
- !strncmp(wt->snapshot_id, snapshot_id, PATHLEN)) {
- pthread_mutex_unlock(&workers_lock);
- return wt;
- }
- }
- pthread_mutex_unlock(&workers_lock);
return NULL;
}
-static int init_sync_structures(void)
-{
- if (sem_init(&workers_semph, 0, 0) != 0) {
- pr_perror("Workers semaphore init failed");
- return -1;
- }
-
- return 0;
-}
-
-void prepare_recv_rimg(void)
-{
- pthread_mutex_lock(&rimg_lock);
- writing++;
- pthread_mutex_unlock(&rimg_lock);
-}
-
-void finalize_recv_rimg(struct rimage *rimg)
-{
-
- pthread_mutex_lock(&rimg_lock);
-
- if (rimg)
- list_add_tail(&(rimg->l), &rimg_head);
- writing--;
- pthread_mutex_unlock(&rimg_lock);
- /* Wake thread waiting for this image. */
- if (rimg) {
- struct wthread *wt = get_wt_by_name(rimg->snapshot_id, rimg->path);
- if (wt)
- sem_post(&(wt->wakeup_sem));
- }
-}
-
-bool is_receiving(void)
-{
- int ret;
-
- pthread_mutex_lock(&rimg_lock);
- ret = writing;
- pthread_mutex_unlock(&rimg_lock);
- return ret > 0;
-}
-
-static void prepare_fwd_rimg(void)
-{
- pthread_mutex_lock(&rimg_lock);
- forwarding++;
- pthread_mutex_unlock(&rimg_lock);
-}
-
-static void finalize_fwd_rimg(void)
-{
- pthread_mutex_lock(&rimg_lock);
- forwarding--;
- pthread_mutex_unlock(&rimg_lock);
-}
-
-static bool is_forwarding(void)
-{
- int ret;
-
- pthread_mutex_lock(&rimg_lock);
- ret = forwarding;
- pthread_mutex_unlock(&rimg_lock);
- return ret > 0;
-}
-
-/* This function is called when no more images are coming. Threads still waiting
- * for images will be awaken to send a ENOENT (no such file) to the requester.
- */
-void unlock_workers(void)
+struct roperation *get_rop_by_name(
+ struct list_head *head, const char *snapshot_id, const char *path)
{
- struct wthread *wt = NULL;
-
- pthread_mutex_lock(&workers_lock);
- list_for_each_entry(wt, &workers_head, l)
- sem_post(&(wt->wakeup_sem));
- pthread_mutex_unlock(&workers_lock);
-}
+ struct roperation *rop = NULL;
-int init_daemon(bool background, struct rimage *(*wfi)(struct wthread*))
-{
- if (background) {
- if (daemon(1, 0) == -1) {
- pr_perror("Can't run service server in the background");
- return -1;
+ list_for_each_entry(rop, head, l) {
+ if (!strncmp(rop->path, path, PATHLEN) &&
+ !strncmp(rop->snapshot_id, snapshot_id, PATHLEN)) {
+ return rop;
}
}
- wait_for_image = wfi;
- return init_sync_structures();
+ return NULL;
}
int setup_TCP_server_socket(int port)
@@ -278,11 +186,15 @@ err:
int event_set(int epoll_fd, int op, int fd, uint32_t events, void *data)
{
+ int ret;
struct epoll_event event;
event.events = events;
event.data.ptr = data;
- // TODO - check if this is okay to send a stack allocated object!
- return epoll_ctl(epoll_fd, op, fd, &event);
+
+ ret = epoll_ctl(epoll_fd, op, fd, &event);
+ if (ret)
+ pr_perror("[fd=%d] Unable to set event", fd);
+ return ret;
}
void socket_set_non_blocking(int fd)
@@ -299,6 +211,20 @@ void socket_set_non_blocking(int fd)
pr_perror("Failed to set flags for fd %d", fd);
}
+void socket_set_blocking(int fd)
+{
+ int flags = fcntl(fd, F_GETFL, NULL);
+
+ if (flags < 0) {
+ pr_perror("Failed to obtain flags from fd %d", fd);
+ return;
+ }
+ flags &= (~O_NONBLOCK);
+
+ if (fcntl(fd, F_SETFL, flags) < 0)
+ pr_perror("Failed to set flags for fd %d", fd);
+}
+
int setup_UNIX_server_socket(char *path)
{
struct sockaddr_un addr;
@@ -441,48 +367,10 @@ int64_t read_remote_header(int fd, char *snapshot_id, char *path, int *flags, ui
return ret;
}
-static struct wthread *new_worker(void)
-{
- struct wthread *wt = malloc(sizeof(struct wthread));
-
- if (!wt) {
- pr_perror("Unable to allocate worker thread structure");
- goto err;
- }
- if (sem_init(&(wt->wakeup_sem), 0, 0) != 0) {
- pr_perror("Workers semaphore init failed");
- goto err;
- }
- return wt;
-err:
- free(wt);
- return NULL;
-}
-
-static void add_worker(struct wthread *wt)
-{
- pthread_mutex_lock(&workers_lock);
- list_add_tail(&(wt->l), &workers_head);
- pthread_mutex_unlock(&workers_lock);
- sem_post(&workers_semph);
-}
-
-void join_workers(void)
-{
- struct wthread *wthread = NULL;
-
- while (! list_empty(&workers_head)) {
- wthread = list_entry(workers_head.next, struct wthread, l);
- pthread_join(wthread->tid, NULL);
- list_del(&(wthread->l));
- free(wthread);
- }
-}
-
static struct rimage *new_remote_image(char *path, char *snapshot_id)
{
- struct rimage *rimg = malloc(sizeof(struct rimage));
- struct rbuf *buf = malloc(sizeof(struct rbuf));
+ struct rimage *rimg = calloc(1, sizeof(struct rimage));
+ struct rbuf *buf = calloc(1, sizeof(struct rbuf));
if (rimg == NULL || buf == NULL) {
pr_perror("Unable to allocate remote image structures");
@@ -490,18 +378,13 @@ static struct rimage *new_remote_image(char *path, char *snapshot_id)
}
strncpy(rimg->path, path, PATHLEN -1 );
- rimg->path[PATHLEN - 1] = '\0';
strncpy(rimg->snapshot_id, snapshot_id, PATHLEN - 1);
+ rimg->path[PATHLEN - 1] = '\0';
rimg->snapshot_id[PATHLEN - 1] = '\0';
- rimg->size = 0;
- buf->nbytes = 0;
INIT_LIST_HEAD(&(rimg->buf_head));
list_add_tail(&(buf->l), &(rimg->buf_head));
+ rimg->curr_fwd_buf = buf;
- if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
- pr_err("Remote image in_use mutex init failed\n");
- goto err;
- }
return rimg;
err:
free(rimg);
@@ -509,11 +392,56 @@ err:
return NULL;
}
+static struct roperation *new_remote_operation(
+ char *path, char *snapshot_id, int cli_fd, int flags, bool close_fd)
+{
+ struct roperation *rop = calloc(1, sizeof(struct roperation));
+
+ if (rop == NULL) {
+ pr_perror("Unable to allocate remote operation structures");
+ return NULL;
+ }
+ strncpy(rop->path, path, PATHLEN -1 );
+ strncpy(rop->snapshot_id, snapshot_id, PATHLEN - 1);
+ rop->path[PATHLEN - 1] = '\0';
+ rop->snapshot_id[PATHLEN - 1] = '\0';
+ rop->fd = cli_fd;
+ rop->flags = flags;
+ rop->close_fd = close_fd;
+
+ return rop;
+}
+
+static void rop_set_rimg(struct roperation* rop, struct rimage* rimg)
+{
+ rop->rimg = rimg;
+ rop->size = rimg->size;
+ if (rop->flags == O_APPEND) {
+ // Image forward on append must start where the last fwd finished.
+ if (rop->fd == proxy_to_cache_fd) {
+ rop->curr_sent_buf = rimg->curr_fwd_buf;
+ rop->curr_sent_bytes = rimg->curr_fwd_bytes;
+ }
+ // For local appends, just write at the end.
+ else {
+ rop->curr_sent_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+ rop->curr_sent_bytes = rop->curr_sent_buf->nbytes;
+ }
+ // On the receiver size, we just append
+ rop->curr_recv_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
+ }
+ else {
+ // Writes or reads are simple. Just do it from the beginnig.
+ rop->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+ rop->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+ rop->curr_sent_bytes = 0;
+
+ }
+}
+
/* Clears a remote image struct for reusing it. */
static struct rimage *clear_remote_image(struct rimage *rimg)
{
- pthread_mutex_lock(&(rimg->in_use));
-
while (!list_is_singular(&(rimg->buf_head))) {
struct rbuf *buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
@@ -524,179 +452,428 @@ static struct rimage *clear_remote_image(struct rimage *rimg)
list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
rimg->size = 0;
- pthread_mutex_unlock(&(rimg->in_use));
-
return rimg;
}
-struct rimage *prepare_remote_image(char *path, char *snapshot_id, int open_mode)
+void handle_accept_write(
+ int cli_fd, char* snapshot_id, char* path, int flags, bool close_fd, uint64_t size)
{
+ struct roperation *rop = NULL;
struct rimage *rimg = get_rimg_by_name(snapshot_id, path);
- /* There is no record of such image, create a new one. */
- if (rimg == NULL)
- return new_remote_image(path, snapshot_id);
-
- pthread_mutex_lock(&rimg_lock);
- list_del(&(rimg->l));
- pthread_mutex_unlock(&rimg_lock);
+ if (rimg == NULL) {
+ rimg = new_remote_image(path, snapshot_id);
+ if (rimg == NULL) {
+ pr_perror("Error preparing remote image");
+ goto err;
+ }
+ }
+ else {
+ list_del(&(rimg->l));
+ if (flags == O_APPEND)
+ clear_remote_image(rimg);
+ }
+
+ rop = new_remote_operation(path, snapshot_id, cli_fd, flags, close_fd);
+ if (rop == NULL) {
+ pr_perror("Error preparing remote operation");
+ goto err;
+ }
+
+ rop_set_rimg(rop, rimg);
+ rop->size = size;
+ list_add_tail(&(rop->l), &rop_inprogress);
+ event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLIN, rop);
+ return;
+err:
+ free(rimg);
+ free(rop);
+}
- /* There is already an image record. Simply return it for appending. */
- if (open_mode == O_APPEND)
- return rimg;
- /* There is already an image record. Clear it for writing. */
- else
- return clear_remote_image(rimg);
+void handle_accept_proxy_write(
+ int cli_fd, char* snapshot_id, char* path, int flags)
+{
+ handle_accept_write(cli_fd, snapshot_id, path, flags, true, 0);
}
-static void *process_local_read(struct wthread *wt)
+void handle_accept_proxy_read(
+ int cli_fd, char* snapshot_id, char* path, int flags)
{
+ struct roperation *rop = NULL;
struct rimage *rimg = NULL;
- int64_t ret;
- /* TODO - split wait_for_image
- * in cache - improve the parent stuf
- * in proxy - do not wait for anything, return no file
- */
- rimg = wait_for_image(wt);
- if (!rimg) {
- pr_info("No image %s:%s.\n", wt->path, wt->snapshot_id);
- if (write_reply_header(wt->fd, ENOENT) < 0)
+
+ rimg = get_rimg_by_name(snapshot_id, path);
+
+ // Check if we already have the image.
+ if (rimg == NULL) {
+ pr_info("No image %s:%s.\n", path, snapshot_id);
+ if (write_reply_header(cli_fd, ENOENT) < 0) {
pr_perror("Error writing reply header for unexisting image");
- close(wt->fd);
- return NULL;
- } else {
- if (write_reply_header(wt->fd, 0) < 0) {
+ goto err;
+ }
+ }
+ else {
+ if (write_reply_header(cli_fd, 0) < 0) {
pr_perror("Error writing reply header for %s:%s",
- wt->path, wt->snapshot_id);
- close(wt->fd);
- return NULL;
+ path, snapshot_id);
+ goto err;
}
- }
- pthread_mutex_lock(&(rimg->in_use));
- ret = send_image(wt->fd, rimg, wt->flags, true);
- if (ret < 0)
- pr_perror("Unable to send %s:%s to CRIU (sent %ld bytes)",
- rimg->path, rimg->snapshot_id, (long)ret);
- else
- pr_info("Finished sending %s:%s to CRIU (sent %ld bytes)\n",
- rimg->path, rimg->snapshot_id, (long)ret);
- pthread_mutex_unlock(&(rimg->in_use));
- return NULL;
+ rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
+ if (rop == NULL) {
+ pr_perror("Error preparing remote operation");
+ goto err;
+ }
+ rop_set_rimg(rop, rimg);
+ list_add_tail(&(rop->l), &rop_inprogress);
+ event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
+ }
+ return;
+err:
+ close(cli_fd);
}
-static void *process_local_image_connection(void *ptr)
+void finish_local()
{
- struct wthread *wt = (struct wthread *) ptr;
- struct rimage *rimg = NULL;
- int64_t ret;
+ int ret;
+ finished_local = true;
+ //shutdown(local_req_fd, SHUT_RD); //TODO - should this be removed?
+ ret = event_set(epoll_fd, EPOLL_CTL_DEL, local_req_fd, 0, 0);
+ if (ret) {
+ pr_perror("Failed to del local fd from epoll");
+ }
+}
- /* NOTE: the code inside this if is shared for both cache and proxy. */
- if (wt->flags == O_RDONLY)
- return process_local_read(wt);
+void handle_accept_cache_read(
+ int cli_fd, char* snapshot_id, char* path, int flags)
+{
+ struct rimage *rimg = NULL;
+ struct roperation *rop = NULL;
- /* NOTE: IMAGE PROXY ONLY. The image cache receives write connections
- * through TCP (see accept_remote_image_connections).
- */
- rimg = prepare_remote_image(wt->path, wt->snapshot_id, wt->flags);
- ret = recv_image(wt->fd, rimg, 0, wt->flags, true);
- if (ret < 0) {
- pr_perror("Unable to receive %s:%s to CRIU (received %ld bytes)",
- rimg->path, rimg->snapshot_id, (long)ret);
- finalize_recv_rimg(NULL);
- return NULL;
+ // Check if this is the restore finish message.
+ if (!strncmp(path, RESTORE_FINISH, sizeof(RESTORE_FINISH))) {
+ close(cli_fd);
+ finish_local();
+ return;
}
- finalize_recv_rimg(rimg);
- pr_info("Finished receiving %s:%s (received %ld bytes)\n",
- rimg->path, rimg->snapshot_id, (long)ret);
+ rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
+ if (rop == NULL) {
+ pr_perror("Error preparing remote operation");
+ close(cli_fd);
+ return;
+ }
- if (!strncmp(rimg->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
- finished = true;
- shutdown(local_req_fd, SHUT_RD);
- } else {
- pthread_mutex_lock(&proxy_to_cache_lock);
- ret = forward_image(rimg);
- pthread_mutex_unlock(&proxy_to_cache_lock);
+ // Check if we already have the image.
+ rimg = get_rimg_by_name(snapshot_id, path);
+ if (rimg != NULL && rimg->size > 0) {
+ if (write_reply_header(cli_fd, 0) < 0) {
+ pr_perror("Error writing reply header for %s:%s",
+ path, snapshot_id);
+ free(rop);
+ close(rop->fd);
+ }
+ rop_set_rimg(rop, rimg);
+ list_add_tail(&(rop->l), &rop_inprogress);
+ event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
}
+ // The file may exist in future.
+ else if (!finished_remote){
+ list_add_tail(&(rop->l), &rop_pending);
+ }
+ // The file does not exist.
+ else {
+ pr_info("No image %s:%s.\n", path, snapshot_id);
+ if (write_reply_header(cli_fd, ENOENT) < 0)
+ pr_perror("Error writing reply header for unexisting image");
+ free(rop);
+ close(cli_fd);
+ }
+}
- finalize_fwd_rimg();
- if (ret < 0) {
- pr_perror("Unable to forward %s:%s to Image Cache",
- rimg->path, rimg->snapshot_id);
+void forward_remote_image(struct roperation* rop)
+{
+ uint64_t ret = 0;
+ // Set blocking during the setup.
+// socket_set_blocking(rop->fd); // TODO - test
- return NULL;
+ ret = write_remote_header(
+ rop->fd, rop->snapshot_id, rop->path, rop->flags, rop->size);
+
+ if (ret < 0) {
+ pr_perror("Error writing header for %s:%s",
+ rop->path, rop->snapshot_id);
+ return;
}
- if (finished && !is_forwarding() && !is_receiving()) {
- pr_info("Closing connection to Image Cache.\n");
- close(proxy_to_cache_fd);
- unlock_workers();
+ pr_info("[fd=%d] Fowarding %s request for %s:%s (%lu bytes\n",
+ rop->fd,
+ rop->flags == O_RDONLY ? "read" :
+ rop->flags == O_APPEND ? "append" : "write",
+ rop->path, rop->snapshot_id, rop->size);
+
+
+ // Go back to non-blocking
+// socket_set_non_blocking(rop->fd); // TODO - test
+
+ forwarding = true;
+ event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
+}
+
+void handle_remote_accept(int fd)
+{
+ char path[PATHLEN];
+ char snapshot_id[PATHLEN];
+ int flags;
+ uint64_t size = 0;
+ uint64_t ret;
+
+ // Set blocking during the setup.
+// socket_set_blocking(fd); // TODO - test!
+
+ ret = read_remote_header(fd, snapshot_id, path, &flags, &size);
+ if (ret < 0) {
+ pr_perror("Unable to receive remote header from image proxy");
+ goto err;
}
- return NULL;
+ /* This means that the no more images are coming. */
+ else if (!ret) {
+ finished_remote = true;
+ pr_info("Image Proxy connection closed.\n");
+ return;
+ }
+
+ // Go back to non-blocking
+// socket_set_non_blocking(fd); // TODO - test!
+
+ pr_info("[fd=%d] Received %s request for %s:%s with %lu bytes\n",
+ fd,
+ flags == O_RDONLY ? "read" :
+ flags == O_APPEND ? "append" : "write",
+ path, snapshot_id, size);
+
+
+ forwarding = true;
+ handle_accept_write(fd, snapshot_id, path, flags, false, size);
+ return;
+err:
+ close(fd);
}
void handle_local_accept(int fd)
{
- struct wthread *wt = NULL;
int cli_fd;
- pthread_t tid;
+ char path[PATHLEN];
+ char snapshot_id[PATHLEN];
+ int flags = 0;
struct sockaddr_in cli_addr;
socklen_t clilen = sizeof(cli_addr);
cli_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
if (cli_fd < 0) {
pr_perror("Unable to accept local image connection");
- goto err;
+ return;
}
- wt = new_worker();
- wt->fd = cli_fd;
-
- if (read_header(wt->fd, wt->snapshot_id, wt->path, &(wt->flags)) < 0) {
+ if (read_header(cli_fd, snapshot_id, path, &flags) < 0) {
pr_err("Error reading local image header\n");
goto err;
}
- /* These function calls are used to avoid other threads from
- * thinking that there are no more images are coming.
- */
- if (wt->flags != O_RDONLY) {
- prepare_recv_rimg();
- prepare_fwd_rimg();
+ pr_info("[fd=%d] Received %s request for %s:%s\n",
+ cli_fd,
+ flags == O_RDONLY ? "read" :
+ flags == O_APPEND ? "append" : "write",
+ path, snapshot_id);
+
+ // Write/Append case (only possible in img-proxy).
+ if (flags != O_RDONLY) {
+ handle_accept_proxy_write(cli_fd, snapshot_id, path, flags);
+ }
+ // Read case while restoring (img-cache).
+ else if (restoring) {
+ handle_accept_cache_read(cli_fd, snapshot_id, path, flags);
+ }
+ // Read case while dumping (img-proxy).
+ else {
+ handle_accept_proxy_read(cli_fd, snapshot_id, path, flags);
}
- pr_info("Received %s request for %s:%s\n",
- wt->flags == O_RDONLY ? "read" :
- wt->flags == O_APPEND ? "append" : "write",
- wt->path, wt->snapshot_id);
+ // Set socket non-blocking.
+ socket_set_non_blocking(cli_fd);
+ return;
+err:
+ close(cli_fd);
+}
- if (pthread_create(
- &tid, NULL, process_local_image_connection, (void *) wt)) {
- pr_perror("Unable to create worker thread");
- goto err;
+void finish_proxy_read(struct roperation* rop)
+{
+ // If finished forwarding image
+ if (rop->fd == proxy_to_cache_fd) {
+ // Update fwd buffer and byte count on rimg.
+ rop->rimg->curr_fwd_buf = rop->curr_sent_buf;
+ rop->rimg->curr_fwd_bytes = rop->curr_sent_bytes;
+
+ forwarding = false;
+
+ // If there are images waiting to be forwarded, forward the next.
+ if (!list_empty(&rop_forwarding)) {
+ forward_remote_image(list_entry(rop_forwarding.next, struct roperation, l));
+ }
+ }
+}
+
+void finish_proxy_write(struct roperation* rop)
+{
+ // No more local images are comming. Close local socket.
+ if (!strncmp(rop->path, DUMP_FINISH, sizeof(DUMP_FINISH))) {
+ // TODO - couldn't we handle the DUMP_FINISH in inside handle_accept_proxy_write?
+ finish_local();
+ }
+ // Normal image received, forward it.
+ else {
+ struct roperation *rop_to_forward = new_remote_operation(
+ rop->path, rop->snapshot_id, proxy_to_cache_fd, rop->flags, false);
+
+ // Add image to list of images.
+ list_add_tail(&(rop->rimg->l), &rimg_head);
+
+ rop_set_rimg(rop_to_forward, rop->rimg);
+ if (list_empty(&rop_forwarding)) {
+ forward_remote_image(rop_to_forward);
+ }
+ list_add_tail(&(rop_to_forward->l), &rop_forwarding);
+ }
+}
+
+void finish_cache_write(struct roperation* rop)
+{
+ struct roperation *prop = get_rop_by_name(
+ &rop_pending, rop->snapshot_id, rop->path);
+
+ forwarding = false;
+ event_set(epoll_fd, EPOLL_CTL_ADD, proxy_to_cache_fd, EPOLLIN, &proxy_to_cache_fd);
+
+ // Add image to list of images.
+ list_add_tail(&(rop->rimg->l), &rimg_head);
+
+ // TODO - what if we have multiple requests for the same name?
+ if (prop != NULL) {
+ pr_info("\t[fd=%d] Resuming pending %s for %s:%s\n",
+ prop->fd,
+ prop->flags == O_APPEND ?
+ "append" : prop->flags == O_RDONLY ?
+ "read" : "write",
+ prop->snapshot_id, prop->path);
+
+ // Write header for pending image.
+ if (write_reply_header(prop->fd, 0) < 0) {
+ pr_perror("Error writing reply header for %s:%s",
+ prop->path, prop->snapshot_id);
+ close(prop->fd);
+ free(prop);
+ return;
+ }
+
+ rop_set_rimg(prop, rop->rimg);
+ list_del(&(prop->l));
+ list_add_tail(&(prop->l), &rop_inprogress);
+ event_set(epoll_fd, EPOLL_CTL_ADD, prop->fd, EPOLLOUT, prop);
+ }
+}
+
+void handle_roperation(struct epoll_event *event, struct roperation *rop)
+{
+ int64_t ret = (EPOLLOUT & event->events) ?
+ send_image_async(rop) :
+ recv_image_async(rop);
+
+ if (ret > 0 || ret == EAGAIN || ret == EWOULDBLOCK) {
+ event_set(
+ epoll_fd,
+ EPOLL_CTL_ADD,
+ rop->fd,
+ event->events,
+ rop);
+ return;
}
- wt->tid = tid;
- add_worker(wt);
- return;
+
+ // Remove rop from list (either in progress or forwarding).
+ list_del(&(rop->l));
+
+ // Operation is finished.
+ if (ret < 0) {
+ pr_perror("Unable to %s %s:%s (returned %ld)",
+ event->events & EPOLLOUT ? "send" : "receive",
+ rop->rimg->path, rop->rimg->snapshot_id, ret);
+ goto err;
+ } else {
+ pr_info("[fd=%d] Finished %s %s:%s to CRIU (size %ld)\n",
+ rop->fd,
+ event->events & EPOLLOUT ? "sending" : "receiving",
+ rop->rimg->path, rop->rimg->snapshot_id, rop->rimg->size);
+ }
+
+ // If receive operation is finished
+ if (event->events & EPOLLIN) {
+
+ // Cached side (finished receiving forwarded image)
+ if (restoring) {
+ finish_cache_write(rop);
+ }
+ // Proxy side (finished receiving local image)
+ else {
+ finish_proxy_write(rop);
+ }
+ }
+ // If send operation if finished
+ else {
+ // Proxy side (Finished forwarding image or reading it locally).
+ if (!restoring)
+ finish_proxy_read(rop);
+ // Nothing to be done when a read is finished on the cache side.
+ }
err:
- close(cli_fd);
- free(wt);
+ free(rop);
}
+void check_pending_forwards()
+{
+ struct roperation *rop = NULL;
+ struct rimage *rimg = NULL;
+
+ list_for_each_entry(rop, &rop_forwarding, l) {
+ rimg = get_rimg_by_name(rop->snapshot_id, rop->path);
+ if (rimg != NULL) {
+ rop_set_rimg(rop, rimg);
+ forward_remote_image(rop);
+ return;
+ }
+ }
+}
-void *accept_local_image_connections(void *port)
+void check_pending_reads()
{
- int fd = *((int *) port);
- int epoll_fd;
- struct epoll_event *events;
+ struct roperation *rop = NULL;
+ struct rimage *rimg = NULL;
+
+ list_for_each_entry(rop, &rop_pending, l) {
+ rimg = get_rimg_by_name(rop->snapshot_id, rop->path);
+ if (rimg != NULL) {
+ rop_set_rimg(rop, rimg);
+ event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
+ }
+ }
+}
+
+void accept_image_connections() {
int ret;
epoll_fd = epoll_create(EPOLL_MAX_EVENTS);
if (epoll_fd < 0) {
pr_perror("Unable to open epoll");
- return NULL;
+ return;
}
events = calloc(EPOLL_MAX_EVENTS, sizeof(struct epoll_event));
@@ -705,57 +882,89 @@ void *accept_local_image_connections(void *port)
goto end;
}
- ret = event_set(epoll_fd, EPOLL_CTL_ADD, fd, EPOLLIN, &fd);
+ ret = event_set(epoll_fd, EPOLL_CTL_ADD, local_req_fd, EPOLLIN, &local_req_fd);
if (ret) {
- pr_perror("Failed to set event for epoll");
+ pr_perror("Failed to add local fd to epoll");
goto end;
}
+ // Only if we are restoring (cache-side) we need to add the remote sock to
+ // the epoll.
+ if (restoring) {
+ ret = event_set(epoll_fd, EPOLL_CTL_ADD, proxy_to_cache_fd,
+ EPOLLIN, &proxy_to_cache_fd);
+ if (ret) {
+ pr_perror("Failed to add proxy to cache fd to epoll");
+ goto end;
+ }
+ }
+
while (1) {
- int n_events = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, -1);
+ int n_events = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, 250);
if (n_events < 0) {
pr_perror("Failed to epoll wait");
goto end;
}
for (int i = 0; i < n_events; i++) {
- if (events[i].data.ptr == &fd) {
+ // Accept from local dump/restore?
+ if (events[i].data.ptr == &local_req_fd) {
if ( events[i].events & EPOLLHUP ||
events[i].events & EPOLLERR) {
- if (!finished)
+ if (!finished_local)
pr_perror("Unable to accept more local image connections");
goto end;
}
- // accept
- pr_perror("Calling accept %d", i);
- handle_local_accept(fd);
+ handle_local_accept(local_req_fd);
}
+ else if (restoring && !forwarding && events[i].data.ptr == &proxy_to_cache_fd) {
+ event_set(epoll_fd, EPOLL_CTL_DEL, proxy_to_cache_fd, 0, 0);
+ handle_remote_accept(proxy_to_cache_fd);
+ }
else {
- // TODO - handle write/read
- pr_perror("Event on unexpected file descripor");
- goto end;
+ struct roperation *rop =
+ (struct roperation*)events[i].data.ptr;
+ event_set(epoll_fd, EPOLL_CTL_DEL, rop->fd, 0, 0);
+ handle_roperation(&events[i], rop);
}
}
- }
+
+ // Check if there are any pending operations
+ if (restoring)
+ check_pending_reads();
+ else if (!forwarding)
+ check_pending_forwards();
+
+ // Check if we can close the tcp socket (this will unblock the cache
+ // to answer "no image" to restore).
+ if (!restoring &&
+ finished_local &&
+ !finished_remote &&
+ list_empty(&rop_forwarding)) {
+ close(proxy_to_cache_fd);
+ finished_remote = true;
+ }
+
+ // If both local and remote sockets are closed, leave.
+ if (finished_local && finished_remote) {
+ pr_info("\tFinished both local and remote, exiting\n");
+ goto end;
+ }
+ }
end:
+ // TODO - release pending when no receiving and finished.
close(epoll_fd);
- close(fd);
+ close(local_req_fd);
free(events);
- return NULL;
}
int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
{
int ret;
- struct roperation *op = malloc(sizeof(struct roperation));
- bzero(op, sizeof(struct roperation));
- op->fd = fd;
- op->rimg = rimg;
- op->size = size;
- op->flags = flags;
- op->close_fd = close_fd;
- op->curr_recv_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+ struct roperation *op = new_remote_operation(
+ rimg->path, rimg->snapshot_id, fd, flags, close_fd);
+ rop_set_rimg(op, rimg);
while ((ret = recv_image_async(op)) < 0)
if (ret != EAGAIN && ret != EWOULDBLOCK)
return -1;
@@ -771,70 +980,58 @@ int64_t recv_image_async(struct roperation *op)
int fd = op->fd;
struct rimage *rimg = op->rimg;
uint64_t size = op->size;
- int flags = op->flags;
bool close_fd = op->close_fd;
struct rbuf *curr_buf = op->curr_recv_buf;
int n;
- if (curr_buf == NULL) {
- if (flags == O_APPEND)
- curr_buf = list_entry(rimg->buf_head.prev, struct rbuf, l);
- else
- curr_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
- }
-
- while (1) {
- n = read(fd,
+ n = read(fd,
curr_buf->buffer + curr_buf->nbytes,
size ?
min((int) (size - rimg->size), BUF_SIZE - curr_buf->nbytes) :
BUF_SIZE - curr_buf->nbytes);
- if (n == 0) {
- if (close_fd)
- close(fd);
- return rimg->size;
- } else if (n > 0) {
- curr_buf->nbytes += n;
- rimg->size += n;
- if (curr_buf->nbytes == BUF_SIZE) {
- struct rbuf *buf = malloc(sizeof(struct rbuf));
- if (buf == NULL) {
- pr_perror("Unable to allocate remote_buffer structures");
- if (close_fd)
- close(fd);
- return -1;
- }
- buf->nbytes = 0;
- list_add_tail(&(buf->l), &(rimg->buf_head));
- curr_buf = buf;
- }
- if (size && rimg->size == size) {
+ if (n == 0) {
+ if (close_fd)
+ close(fd);
+ return n;
+ } else if (n > 0) {
+ curr_buf->nbytes += n;
+ rimg->size += n;
+ if (curr_buf->nbytes == BUF_SIZE) {
+ struct rbuf *buf = malloc(sizeof(struct rbuf));
+ if (buf == NULL) {
+ pr_perror("Unable to allocate remote_buffer structures");
if (close_fd)
close(fd);
- return rimg->size;
+ return -1;
}
- } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
- return errno;
- } else {
- pr_perror("Read on %s:%s socket failed",
- rimg->path, rimg->snapshot_id);
+ buf->nbytes = 0;
+ list_add_tail(&(buf->l), &(rimg->buf_head));
+ op->curr_recv_buf = buf;
+ return n;
+ }
+ if (size && rimg->size == size) {
if (close_fd)
close(fd);
- return -1;
+ return 0;
}
+ } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return errno;
+ } else {
+ pr_perror("Read for %s:%s socket on fd=%d failed",
+ rimg->path, rimg->snapshot_id, fd);
+ if (close_fd)
+ close(fd);
+ return -1;
}
+ return n;
}
int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
{
int ret;
- struct roperation *op = malloc(sizeof(struct roperation));
- bzero(op, sizeof(struct roperation));
- op->fd = fd;
- op->rimg = rimg;
- op->flags = flags;
- op->close_fd = close_fd;
- op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
+ struct roperation *op = new_remote_operation(
+ rimg->path, rimg->snapshot_id, fd, flags, close_fd);
+ rop_set_rimg(op, rimg);
while ((ret = send_image_async(op)) < 0)
if (ret != EAGAIN && ret != EWOULDBLOCK)
return -1;
@@ -845,47 +1042,43 @@ int64_t send_image_async(struct roperation *op)
{
int fd = op->fd;
struct rimage *rimg = op->rimg;
- int flags = op->flags;
bool close_fd = op->close_fd;
int n;
- if (flags != O_APPEND) {
- op->curr_sent_buf = list_entry(rimg->buf_head.next, struct rbuf, l);
- op->curr_sent_bytes = 0;
- }
-
- while (1) {
- n = send(
- fd,
- op->curr_sent_buf->buffer + op->curr_sent_bytes,
- min(BUF_SIZE, op->curr_sent_buf->nbytes) - op->curr_sent_bytes,
- MSG_NOSIGNAL);
- if (n > -1) {
- op->curr_sent_bytes += n;
- if (op->curr_sent_bytes == BUF_SIZE) {
- op->curr_sent_buf =
- list_entry(op->curr_sent_buf->l.next, struct rbuf, l);
- op->nblocks++;
- op->curr_sent_bytes = 0;
- } else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
- if (close_fd)
- close(fd);
- return op->nblocks*BUF_SIZE + op->curr_sent_buf->nbytes;
- }
- } else if (errno == EPIPE || errno == ECONNRESET) {
- pr_warn("Connection for %s:%s was closed early than expected\n",
- rimg->path, rimg->snapshot_id);
- return 0;
- } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
- return errno;
+ n = write(
+ fd,
+ op->curr_sent_buf->buffer + op->curr_sent_bytes,
+ min(BUF_SIZE, op->curr_sent_buf->nbytes) - op->curr_sent_bytes);
+
+ if (n > -1) {
+ op->curr_sent_bytes += n;
+ if (op->curr_sent_bytes == BUF_SIZE) {
+ op->curr_sent_buf =
+ list_entry(op->curr_sent_buf->l.next, struct rbuf, l);
+ op->curr_sent_bytes = 0;
+ return n;
}
- else {
- pr_perror("Write on %s:%s socket failed",
- rimg->path, rimg->snapshot_id);
- return -1;
+ // TODO - cloudn't we just compare to the img size?
+ else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
+ if (close_fd)
+ close(fd);
+ return 0;
}
+ return n;
+ }
+ // TODO - clouldn't these checks be made upstream?
+ else if (errno == EPIPE || errno == ECONNRESET) {
+ pr_warn("Connection for %s:%s was closed early than expected\n",
+ rimg->path, rimg->snapshot_id);
+ return 0;
+ } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return errno;
+ }
+ else {
+ pr_perror("Write on %s:%s socket failed",
+ rimg->path, rimg->snapshot_id);
+ return -1;
}
-
}
int read_remote_image_connection(char *snapshot_id, char *path)
diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
index 779a137f..029857f7 100644
--- a/criu/include/img-remote.h
+++ b/criu/include/img-remote.h
@@ -32,32 +32,31 @@ struct rbuf {
};
struct rimage {
+ /* Path and snapshot id identify the image. */
char path[PATHLEN];
char snapshot_id[PATHLEN];
+ /* List anchor. */
struct list_head l;
+ /* List of buffers that compose the image. */
struct list_head buf_head;
- uint64_t size; /* number of bytes */
- pthread_mutex_t in_use; /* Only one operation at a time, per image. */
-};
-
-struct wthread {
- pthread_t tid;
- struct list_head l;
- /* Client fd. */
- int fd;
- /* The path and snapshot_id identify the request handled by this thread. */
- char path[PATHLEN];
- char snapshot_id[PATHLEN];
- int flags;
- /* This semph is used to wake this thread if the image is in memory.*/
- sem_t wakeup_sem;
+ /* Number of bytes. */
+ uint64_t size;
+ /* Note: forward (send) operation only. Buffer to start forwarding. */
+ struct rbuf *curr_fwd_buf;
+ /* Note: forward (send) operation only. Number of fwd bytes in 'curr_fw_buf'. */
+ uint64_t curr_fwd_bytes;
};
/* Structure that describes the state of a remote operation on remote images. */
struct roperation {
+ /* List anchor. */
+ struct list_head l;
/* File descriptor being used. */
int fd;
- /* Remote image being used. */
+ /* Path and snapshot id identify the required image. */
+ char path[PATHLEN];
+ char snapshot_id[PATHLEN];
+ /* Remote image being used (may be null if the operation is pending). */
struct rimage *rimg;
/* Flags for the operation. */
int flags;
@@ -66,37 +65,23 @@ struct roperation {
/* Note: recv operation only. How much bytes should be received. */
uint64_t size;
/* Note: recv operation only. Buffer being writen. */
- struct rbuf *curr_recv_buf;
- /* Note: send operation only. Number of blocks already sent. */
- int nblocks;
+ struct rbuf *curr_recv_buf; // TODO - needed? Could be replaced by list.last!
/* Note: send operation only. Pointer to buffer being sent. */
struct rbuf *curr_sent_buf;
/* Note: send operation only. Number of bytes sent in 'curr_send_buf. */
uint64_t curr_sent_bytes;
};
-/* This variable is used to indicate when the dump is finished. */
-extern bool finished;
/* This is the proxy to cache TCP socket FD. */
extern int proxy_to_cache_fd;
/* This the unix socket used to fulfill local requests. */
extern int local_req_fd;
+/* True if we are running the cache/restore, false if proxy/dump. */
+extern bool restoring;
-int init_daemon(bool background, struct rimage *(*wfi)(struct wthread*));
-
-void join_workers(void);
-void unlock_workers(void);
-
-void prepare_recv_rimg(void);
-void finalize_recv_rimg(struct rimage *rimg);
-struct rimage *prepare_remote_image(char *path, char *namesapce, int flags);
+void accept_image_connections();
struct rimage *get_rimg_by_name(const char *snapshot_id, const char *path);
-bool is_receiving(void);
-
-void *accept_local_image_connections(void *ptr);
-void *accept_remote_image_connections(void *ptr);
-int64_t forward_image(struct rimage *rimg);
int64_t send_image(int fd, struct rimage *rimg, int flags, bool image_check);
int64_t send_image_async(struct roperation *op);
int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool image_check);
--
2.17.0
More information about the CRIU
mailing list