[CRIU] [PATCH 1/5] Preparing image receive and send for asynchronous sockets.

Rodrigo Bruno rbruno at gsd.inesc-id.pt
Wed May 23 00:14:24 MSK 2018


Thanks, that would be great and then we could work on top of that!

Let me know when the merge is done.

best,
rodrigo

2018-05-16 6:35 GMT+01:00 Andrei Vagin <avagin at virtuozzo.com>:

> On Tue, May 15, 2018 at 08:52:03AM +0100, Rodrigo Bruno wrote:
> > Hi Andrei,
> >
> > thank you.
> >
> > So we can start working on fixing bugs in order for the zdtm tests to
> pass
> > sending patches against
> > https://github.com/avagin/criu/tree/remote ?
>
> I'm going to merge these patches into criu-dev soon.
>
> >
> > cheers,
> > rodrigo
> >
> > 2018-05-15 7:49 GMT+01:00 Andrei Vagin <avagin at virtuozzo.com>:
> >
> > >
> > > I rebased these patches and added a few minor fixes:
> > > https://github.com/avagin/criu/tree/remote
> > >
> > > On Mon, May 14, 2018 at 01:29:47AM +0100, rodrigo-bruno wrote:
> > > > From: Rodrigo Bruno <rbruno at gsd.inesc-id.pt>
> > > >
> > > > ---
> > > >  criu/img-remote.c         | 93 +++++++++++++++++++++++++++++-
> ---------
> > > >  criu/include/img-remote.h | 28 ++++++++++--
> > > >  2 files changed, 93 insertions(+), 28 deletions(-)
> > > >
> > > > diff --git a/criu/img-remote.c b/criu/img-remote.c
> > > > index f812c52d..70db71e2 100644
> > > > --- a/criu/img-remote.c
> > > > +++ b/criu/img-remote.c
> > > > @@ -470,8 +470,6 @@ static struct rimage *new_remote_image(char
> *path,
> > > char *snapshot_id)
> > > >       buf->nbytes = 0;
> > > >       INIT_LIST_HEAD(&(rimg->buf_head));
> > > >       list_add_tail(&(buf->l), &(rimg->buf_head));
> > > > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > > rbuf, l);
> > > > -     rimg->curr_sent_bytes = 0;
> > > >
> > > >       if (pthread_mutex_init(&(rimg->in_use), NULL) != 0) {
> > > >               pr_err("Remote image in_use mutex init failed\n");
> > > > @@ -498,8 +496,6 @@ static struct rimage *clear_remote_image(struct
> > > rimage *rimg)
> > > >
> > > >       list_entry(rimg->buf_head.next, struct rbuf, l)->nbytes = 0;
> > > >       rimg->size = 0;
> > > > -     rimg->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> > > rbuf, l);
> > > > -     rimg->curr_sent_bytes = 0;
> > > >
> > > >       pthread_mutex_unlock(&(rimg->in_use));
> > > >
> > > > @@ -669,18 +665,43 @@ err:
> > > >       return NULL;
> > > >  }
> > > >
> > > > +
> > > > +int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > flags, bool close_fd)
> > > > +{
> > > > +     int ret;
> > > > +     struct roperation *op = malloc(sizeof(struct roperation));
> > > > +     bzero(op, sizeof(struct roperation));
> > > > +     op->fd = fd;
> > > > +     op->rimg = rimg;
> > > > +     op->size = size;
> > > > +     op->flags = flags;
> > > > +     op->close_fd = close_fd;
> > > > +     op->curr_recv_buf = list_entry(rimg->buf_head.next, struct
> rbuf,
> > > l);
> > > > +     while ((ret = recv_image_async(op)) < 0)
> > > > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > > > +                     return -1;
> > > > +     return ret;
> > > > +}
> > > > +
> > > >  /* Note: size is a limit on how much we want to read from the
> socket.
> > > Zero means
> > > >   * read until the socket is closed.
> > > >   */
> > > > -int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > flags, bool close_fd)
> > > > +int64_t recv_image_async(struct roperation *op)
> > > >  {
> > > > -     struct rbuf *curr_buf = NULL;
> > > > +     int fd = op->fd;
> > > > +     struct rimage *rimg = op->rimg;
> > > > +     uint64_t size = op->size;
> > > > +     int flags = op->flags;
> > > > +     bool close_fd = op->close_fd;
> > > > +     struct rbuf *curr_buf = op->curr_recv_buf;
> > > >       int n;
> > > >
> > > > -     if (flags == O_APPEND)
> > > > -             curr_buf = list_entry(rimg->buf_head.prev, struct
> rbuf,
> > > l);
> > > > -     else
> > > > -             curr_buf = list_entry(rimg->buf_head.next, struct
> rbuf,
> > > l);
> > > > +     if (curr_buf == NULL) {
> > > > +             if (flags == O_APPEND)
> > > > +                     curr_buf = list_entry(rimg->buf_head.prev,
> struct
> > > rbuf, l);
> > > > +             else
> > > > +                     curr_buf = list_entry(rimg->buf_head.next,
> struct
> > > rbuf, l);
> > > > +     }
> > > >
> > > >       while (1) {
> > > >               n = read(fd,
> > > > @@ -712,6 +733,8 @@ int64_t recv_image(int fd, struct rimage *rimg,
> > > uint64_t size, int flags, bool c
> > > >                                       close(fd);
> > > >                               return rimg->size;
> > > >                       }
> > > > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > > > +                     return errno;
> > > >               } else {
> > > >                       pr_perror("Read on %s:%s socket failed",
> > > >                               rimg->path, rimg->snapshot_id);
> > > > @@ -724,37 +747,59 @@ int64_t recv_image(int fd, struct rimage *rimg,
> > > uint64_t size, int flags, bool c
> > > >
> > > >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> > > close_fd)
> > > >  {
> > > > +     int ret;
> > > > +     struct roperation *op = malloc(sizeof(struct roperation));
> > > > +     bzero(op, sizeof(struct roperation));
> > > > +     op->fd = fd;
> > > > +     op->rimg = rimg;
> > > > +     op->flags = flags;
> > > > +     op->close_fd = close_fd;
> > > > +     op->curr_sent_buf = list_entry(rimg->buf_head.next, struct
> rbuf,
> > > l);
> > > > +     while ((ret = send_image_async(op)) < 0)
> > > > +             if (ret != EAGAIN && ret != EWOULDBLOCK)
> > > > +                     return -1;
> > > > +     return ret;
> > > > +}
> > > >
> > > > -     int n, nblocks = 0;
> > > > +int64_t send_image_async(struct roperation *op)
> > > > +{
> > > > +     int fd = op->fd;
> > > > +     struct rimage *rimg = op->rimg;
> > > > +     int flags = op->flags;
> > > > +     bool close_fd = op->close_fd;
> > > > +     int n;
> > > >
> > > >       if (flags != O_APPEND) {
> > > > -             rimg->curr_sent_buf = list_entry(rimg->buf_head.next,
> > > struct rbuf, l);
> > > > -             rimg->curr_sent_bytes = 0;
> > > > +             op->curr_sent_buf = list_entry(rimg->buf_head.next,
> > > struct rbuf, l);
> > > > +             op->curr_sent_bytes = 0;
> > > >       }
> > > >
> > > >       while (1) {
> > > >               n = send(
> > > >                   fd,
> > > > -                 rimg->curr_sent_buf->buffer +
> rimg->curr_sent_bytes,
> > > > -                 min(BUF_SIZE, rimg->curr_sent_buf->nbytes) -
> > > rimg->curr_sent_bytes,
> > > > +                 op->curr_sent_buf->buffer + op->curr_sent_bytes,
> > > > +                 min(BUF_SIZE, op->curr_sent_buf->nbytes) -
> > > op->curr_sent_bytes,
> > > >                   MSG_NOSIGNAL);
> > > >               if (n > -1) {
> > > > -                     rimg->curr_sent_bytes += n;
> > > > -                     if (rimg->curr_sent_bytes == BUF_SIZE) {
> > > > -                             rimg->curr_sent_buf =
> > > > -                                 list_entry(rimg->curr_sent_
> buf->l.next,
> > > struct rbuf, l);
> > > > -                             nblocks++;
> > > > -                             rimg->curr_sent_bytes = 0;
> > > > -                     } else if (rimg->curr_sent_bytes ==
> > > rimg->curr_sent_buf->nbytes) {
> > > > +                     op->curr_sent_bytes += n;
> > > > +                     if (op->curr_sent_bytes == BUF_SIZE) {
> > > > +                             op->curr_sent_buf =
> > > > +                                 list_entry(op->curr_sent_buf->
> l.next,
> > > struct rbuf, l);
> > > > +                             op->nblocks++;
> > > > +                             op->curr_sent_bytes = 0;
> > > > +                     } else if (op->curr_sent_bytes ==
> > > op->curr_sent_buf->nbytes) {
> > > >                               if (close_fd)
> > > >                                       close(fd);
> > > > -                             return nblocks*BUF_SIZE +
> > > rimg->curr_sent_buf->nbytes;
> > > > +                             return op->nblocks*BUF_SIZE +
> > > op->curr_sent_buf->nbytes;
> > > >                       }
> > > >               } else if (errno == EPIPE || errno == ECONNRESET) {
> > > >                       pr_warn("Connection for %s:%s was closed early
> > > than expected\n",
> > > >                               rimg->path, rimg->snapshot_id);
> > > >                       return 0;
> > > > -             } else {
> > > > +             } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> > > > +                     return errno;
> > > > +             }
> > > > +             else {
> > > >                       pr_perror("Write on %s:%s socket failed",
> > > >                               rimg->path, rimg->snapshot_id);
> > > >                       return -1;
> > > > diff --git a/criu/include/img-remote.h b/criu/include/img-remote.h
> > > > index 1771d310..0947e7f0 100644
> > > > --- a/criu/include/img-remote.h
> > > > +++ b/criu/include/img-remote.h
> > > > @@ -36,10 +36,6 @@ struct rimage {
> > > >       char snapshot_id[PATHLEN];
> > > >       struct list_head l;
> > > >       struct list_head buf_head;
> > > > -     /* Used to track already sent buffers when the image is
> appended.
> > > */
> > > > -     struct rbuf *curr_sent_buf;
> > > > -     /* Similar to the previous field. Number of bytes sent in
> > > 'curr_sent_buf'. */
> > > > -     int curr_sent_bytes;
> > > >       uint64_t size; /* number of bytes */
> > > >       pthread_mutex_t in_use; /* Only one operation at a time, per
> > > image. */
> > > >  };
> > > > @@ -57,6 +53,28 @@ struct wthread {
> > > >       sem_t wakeup_sem;
> > > >  };
> > > >
> > > > +/* Structure that describes the state of a remote operation on
> remote
> > > images. */
> > > > +struct roperation {
> > > > +     /* File descriptor being used. */
> > > > +     int fd;
> > > > +     /* Remote image being used. */
> > > > +     struct rimage *rimg;
> > > > +     /* Flags for the operation. */
> > > > +     int flags;
> > > > +     /* If fd should be closed when the operation is done. */
> > > > +     bool close_fd;
> > > > +     /* Note: recv operation only. How much bytes should be
> received. */
> > > > +     uint64_t size;
> > > > +     /* Note: recv operation only. Buffer being writen. */
> > > > +     struct rbuf *curr_recv_buf;
> > > > +     /* Note: send operation only. Number of blocks already sent. */
> > > > +     int nblocks;
> > > > +     /* Note: send operation only. Pointer to buffer being sent. */
> > > > +     struct rbuf *curr_sent_buf;
> > > > +     /* Note: send operation only. Number of bytes sent in
> > > 'curr_send_buf. */
> > > > +     uint64_t curr_sent_bytes;
> > > > +};
> > > > +
> > > >  /* This variable is used to indicate when the dump is finished. */
> > > >  extern bool finished;
> > > >  /* This is the proxy to cache TCP socket FD. */
> > > > @@ -80,7 +98,9 @@ void *accept_remote_image_connections(void *ptr);
> > > >
> > > >  int64_t forward_image(struct rimage *rimg);
> > > >  int64_t send_image(int fd, struct rimage *rimg, int flags, bool
> > > image_check);
> > > > +int64_t send_image_async(struct roperation *op);
> > > >  int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int
> > > flags, bool image_check);
> > > > +int64_t recv_image_async(struct roperation *op);
> > > >
> > > >  int64_t read_remote_header(int fd, char *snapshot_id, char *path,
> int
> > > *open_mode, uint64_t *size);
> > > >  int64_t write_remote_header(int fd, char *snapshot_id, char *path,
> int
> > > open_mode, uint64_t size);
> > > > --
> > > > 2.17.0
> > > >
> > > > _______________________________________________
> > > > CRIU mailing list
> > > > CRIU at openvz.org
> > > > https://lists.openvz.org/mailman/listinfo/criu
> > >
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.openvz.org/pipermail/criu/attachments/20180522/4e62103d/attachment-0001.html>


More information about the CRIU mailing list