[Devel] [PATCH VZ9] fs/fuse kio: implement zerocopy read with krpc
Alexey Kuznetsov
kuznet at virtuozzo.com
Wed Sep 25 11:16:15 MSK 2024
Ack
On Wed, Sep 25, 2024 at 2:56 PM Liu Kui <kui.liu at virtuozzo.com> wrote:
>
> For FUSE_READ, original page vectors can be found from fuse core by
> fuse request sequence number and device id. When a read request rpc
> msg is sent via krpc, instead of a userspace buffer, userspace passes
> fuse request sequence number, device id and offset in original fuse
> request to krpc, so that krpc can direclty use page vectors from fuse
> request as buffer for read response, achieving zerocopy - from network
> to end buffer direclty.
>
> Since it's possible that some parts of fuse request do not come from
> network (not zerocopied via krpc), existing fuse reply is extended to
> handle such case with speical error code FUSE_OUT_KRPCZC and metadata
> is returned in stead of data. Metadata is a list of chunk descriptors.
> a chunk descriptor tells whether data needs to be copied from userspace
> or has already been zerocopied into fuse request buffer, for a section
> of fuse request.
>
> https://virtuozzo.atlassian.net/browse/VSTOR-90183
>
> Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
> ---
> fs/fuse/dev.c | 129 ++++++++++++++++++++++++++++-
> fs/fuse/fuse_i.h | 8 ++
> fs/fuse/kio/pcs/pcs_krpc.c | 141 +++++++++++++++++++++++---------
> fs/fuse/kio/pcs/pcs_krpc.h | 10 ++-
> fs/fuse/kio/pcs/pcs_krpc_prot.h | 19 ++++-
> include/uapi/linux/fuse.h | 1 +
> 6 files changed, 260 insertions(+), 48 deletions(-)
>
> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
> index 01df9d4e4e70..f5594e49bc2f 100644
> --- a/fs/fuse/dev.c
> +++ b/fs/fuse/dev.c
> @@ -2144,6 +2144,112 @@ static int copy_out_splices(struct fuse_copy_state *cs, struct fuse_args *args,
> goto out;
> }
>
> +static int copy_out_krpczc(struct fuse_copy_state *cs, struct fuse_args *args,
> + unsigned int nbytes)
> +{
> + struct fuse_args_pages *ap = container_of(args, typeof(*ap), args);
> + struct pcs_krpczc_chunk chunkarr_inline[8];
> + struct pcs_krpczc_chunk *chunkarr, *chunk;
> + int nchunks, i, didx;
> + unsigned int soff, doff, dend, poff;
> + void *dst;
> + int err;
> +
> + nchunks = nbytes - sizeof(struct fuse_out_header);
> +
> + /* this is always at least one chunk */
> + if (nchunks == 0)
> + return -EINVAL;
> +
> + if (nchunks % sizeof(struct pcs_krpczc_chunk))
> + return -EINVAL;
> +
> + if (nchunks > sizeof(chunkarr_inline)) {
> + chunkarr = kmalloc(nchunks, GFP_KERNEL);
> + if (!chunkarr)
> + return -ENOMEM;
> + } else
> + chunkarr = chunkarr_inline;
> +
> + err = fuse_copy_one(cs, chunkarr, nchunks);
> + if (err)
> + goto out;
> +
> + fuse_copy_finish(cs);
> +
> + nchunks /= sizeof(struct pcs_krpczc_chunk);
> +
> + soff = 0;
> + poff = 0;
> + didx = 0;
> + doff = ap->descs[didx].offset;
> + dend = doff + ap->descs[didx].length;
> + for (i = 0; i < nchunks; i++) {
> + void __user *src;
> + unsigned int ilen, copy;
> +
> + chunk = &chunkarr[i];
> + src = (void __user *)chunk->addr;
> + ilen = chunk->len;
> +
> + while (ilen) {
> + copy = min(ilen, dend - doff);
> +
> + /* copy data from userspace buff, otherwise data had been zero-copied */
> + if (src) {
> + dst = kmap_atomic(ap->pages[didx]);
> + if (copy_from_user(dst + doff, src, copy)) {
> + kunmap_atomic(dst);
> + err = -EFAULT;
> + goto out;
> + }
> + kunmap_atomic(dst);
> +
> + src += copy;
> + }
> +
> + doff += copy;
> + ilen -= copy;
> +
> + if (ilen) {
> + /* move to next page */
> + poff += ap->descs[didx].length;
> + didx++;
> + if (didx >= ap->num_pages)
> + goto out_overflow;
> +
> + doff = ap->descs[didx].offset;
> + dend = doff + ap->descs[didx].length;
> + }
> + }
> + soff += chunk->len;
> + }
> +
> + /* zero the remaining */
> + if (args->page_zeroing && didx < ap->num_pages) {
> + if (doff < dend) {
> + dst = kmap_atomic(ap->pages[didx]);
> + memset(dst + doff, 0, dend - doff);
> + kunmap_atomic(dst);
> + }
> +
> + for (didx++; didx < ap->num_pages; didx++) {
> + dst = kmap_atomic(ap->pages[didx]);
> + memset(dst + ap->descs[didx].offset, 0, ap->descs[didx].length);
> + kunmap_atomic(dst);
> + }
> + }
> +
> + err = 0;
> +out:
> + if (chunkarr != chunkarr_inline)
> + kfree(chunkarr);
> + return err;
> +
> +out_overflow:
> + err = -EMSGSIZE;
> + goto out;
> +}
> /*
> * Write a single reply to a request. First the header is copied from
> * the write buffer. The request is then searched on the processing
> @@ -2183,6 +2289,7 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
>
> err = -EINVAL;
> if (oh.error != FUSE_OUT_SPLICES &&
> + oh.error != FUSE_OUT_KRPCZC &&
> (oh.error <= -512 || oh.error > 0))
> goto copy_finish;
>
> @@ -2215,7 +2322,8 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
> goto copy_finish;
> }
>
> - if (oh.error == FUSE_OUT_SPLICES) {
> + if (oh.error == FUSE_OUT_SPLICES &&
> + oh.error == FUSE_OUT_KRPCZC) {
> if (req->args->out_numargs != 1 || !req->args->out_pages) {
> spin_unlock(&fpq->lock);
> err = -EINVAL;
> @@ -2235,6 +2343,9 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
> if (oh.error == FUSE_OUT_SPLICES) {
> req->out.h.error = 0;
> err = copy_out_splices(cs, req->args, nbytes);
> + } else if (oh.error == FUSE_OUT_KRPCZC) {
> + req->out.h.error = 0;
> + err = copy_out_krpczc(cs, req->args, nbytes);
> } else if (oh.error)
> err = nbytes != sizeof(oh) ? -EINVAL : 0;
> else
> @@ -2565,6 +2676,22 @@ int fuse_dev_release(struct inode *inode, struct file *file)
> }
> EXPORT_SYMBOL_GPL(fuse_dev_release);
>
> +struct fuse_req *fuse_dev_find_request(int fd, u64 unique)
> +{
> + struct file *f = fget(fd);
> + struct fuse_dev *fud = fuse_get_dev(f);
> + struct fuse_pqueue *fpq = &fud->pq;
> + struct fuse_req *req = NULL;
> +
> + spin_lock(&fpq->lock);
> + if (fpq->connected)
> + req = request_find(&fud->pq, unique);
> + spin_unlock(&fpq->lock);
> +
> + return req;
> +}
> +EXPORT_SYMBOL_GPL(fuse_dev_find_request);
> +
> static int fuse_dev_fasync(int fd, struct file *file, int on)
> {
> struct fuse_dev *fud = fuse_get_dev(file);
> diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
> index 090871a1e356..35488283cea3 100644
> --- a/fs/fuse/fuse_i.h
> +++ b/fs/fuse/fuse_i.h
> @@ -654,6 +654,12 @@ struct fuse_kio_ops {
> int fuse_register_kio(struct fuse_kio_ops *ops);
> void fuse_unregister_kio(struct fuse_kio_ops *ops);
>
> +struct pcs_krpczc_chunk {
> + u64 addr;
> + u32 offset;
> + u32 len;
> +};
> +
> #define FUSE_QHASH_SIZE 128
>
> #include <linux/jhash.h>
> @@ -1551,6 +1557,8 @@ void fuse_release_ff(struct inode *inode, struct fuse_file *ff);
> void fuse_kill_requests(struct fuse_conn *fc, struct inode *inode,
> struct list_head *req_list);
>
> +struct fuse_req *fuse_dev_find_request(int fd, u64 unique);
> +
> /* readdir.c */
> int fuse_readdir(struct file *file, struct dir_context *ctx);
>
> diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
> index 0ef33b730204..8f62f12782e5 100644
> --- a/fs/fuse/kio/pcs/pcs_krpc.c
> +++ b/fs/fuse/kio/pcs/pcs_krpc.c
> @@ -2,6 +2,8 @@
> * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved.
> */
>
> +#include "../../fuse_i.h"
> +
> #include <linux/types.h>
> #include <linux/list.h>
> #include <linux/refcount.h>
> @@ -79,7 +81,7 @@ static void krpc_req_complete(struct krpc_req *kreq, int error)
> chunk = &kreq->data_chunks[i];
> if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
> pcs_umem_release(chunk->umem);
> - else
> + else if (chunk->type == KRPC_CHUNK_TYPE_MR)
> pcs_mr_put(chunk->mr);
> }
>
> @@ -305,6 +307,76 @@ static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_re
> return res;
> }
>
> +
> +static void kreq_prepare_data_buff(struct krpc_req *kreq, struct krpc_chunk *chunk)
> +{
> + struct page *page;
> + u64 pos;
> + int offset, len;
> + struct bio_vec *bvec;
> +
> + pos = chunk->addr; // offset in request
> + len = chunk->len;
> +
> + /* build the bvecs for the data chunk*/
> + if (chunk->type == KRPC_CHUNK_TYPE_ZC) {
> + struct fuse_args *args = chunk->req->args;
> + struct fuse_io_args *ia = container_of(args, typeof(*ia), ap.args);
> + int i = 0;
> +
> + /* locate the first page */
> + while (i < ia->ap.num_pages) {
> + if (pos < ia->ap.descs[i].length)
> + break;
> + pos -= ia->ap.descs[i].length;
> + i++;
> + }
> +
> + offset = pos;
> + while (len) {
> + /* data bvec array overflow? */
> + BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
> +
> + bvec = &kreq->data_bvecs[kreq->nr_data_bvecs];
> +
> + bvec->bv_page = ia->ap.pages[i];
> + bvec->bv_offset = ia->ap.descs[i].offset + offset;
> + bvec->bv_len = len < (ia->ap.descs[i].length - offset) ? len :
> + (ia->ap.descs[i].length - offset);
> +
> + len -= bvec->bv_len;
> + kreq->nr_data_bvecs++;
> + i++;
> + offset = 0;
> + }
> + } else {
> + struct pcs_umem *umem;
> +
> + umem = (chunk->type == KRPC_CHUNK_TYPE_MR) ? chunk->mr->umem : chunk->umem;
> +
> + while (len) {
> + /* data bvec array overflow? */
> + BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
> +
> + bvec = &kreq->data_bvecs[kreq->nr_data_bvecs];
> +
> + offset = pos & (PAGE_SIZE - 1);
> + page = pcs_umem_page(umem, pos);
> + BUG_ON(!page);
> +
> + bvec->bv_page = page;
> + bvec->bv_offset = offset;
> +
> + bvec->bv_len = len < (PAGE_SIZE - offset) ? len : (PAGE_SIZE - offset);
> +
> + pos += bvec->bv_len;
> + len -= bvec->bv_len;
> + kreq->nr_data_bvecs++;
> + }
> + }
> + BUG_ON(len != 0);
> +}
> +
> static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_sendmsg *iocmsg)
> {
> struct krpc_req *kreq;
> @@ -358,58 +430,47 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se
> bvec = &kreq->data_bvecs[0];
>
> for (i = 0; i < iocmsg->nr_data_chunks; i++) {
> - struct page *page;
> - u64 addr;
> - int offset, len;
> -
> - chunk->addr = chunk_bd->addr;
> chunk->len = chunk_bd->len;
> - if (chunk_bd->mr_id) {
> - chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, chunk_bd->mr_id);
> +
> + switch (chunk_bd->type) {
> + case PCS_KRPC_BUF_TYPE_MR: {
> chunk->type = KRPC_CHUNK_TYPE_MR;
> + chunk->addr = chunk_bd->addr;
> + chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, chunk_bd->mr_id);
> if (!chunk->mr) {
> res = -ENXIO;
> goto err_free_data_chunk;
> }
> - } else {
> - /* unregistered memory buf */
> + break;
> + }
> + case PCS_KRPC_BUF_TYPE_UMEM: {
> + chunk->type = KRPC_CHUNK_TYPE_UMEM;
> + chunk->addr = chunk_bd->addr;
> chunk->umem = pcs_umem_get(chunk->addr, chunk->len);
> if (IS_ERR(chunk->umem)) {
> res = PTR_ERR(chunk->umem);
> goto err_free_data_chunk;
> }
> -
> - chunk->type = KRPC_CHUNK_TYPE_UMEM;
> + break;
> }
> + case PCS_KRPC_BUF_TYPE_ZC: {
> + struct pcs_krpc_buf_desc_zc *chunk_bdzc = (struct pcs_krpc_buf_desc_zc *)chunk_bd;
>
> - /* build the bvecs for the data chunk*/
> - addr = chunk->addr;
> - len = chunk->len;
> -
> - while (len) {
> - /* data bvec array overflow? */
> - BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
> -
> - if (chunk->type == KRPC_CHUNK_TYPE_MR)
> - page = pcs_umem_page(chunk->mr->umem, addr);
> - else
> - page = pcs_umem_page(chunk->umem, addr);
> -
> - BUG_ON(!page);
> -
> - offset = addr & (PAGE_SIZE - 1);
> -
> - bvec->bv_page = page;
> - bvec->bv_offset = offset;
> -
> - bvec->bv_len = len < (PAGE_SIZE - offset) ? len : (PAGE_SIZE - offset);
> -
> - addr += bvec->bv_len;
> - len -= bvec->bv_len;
> - bvec++;
> - kreq->nr_data_bvecs++;
> + chunk->type = KRPC_CHUNK_TYPE_ZC;
> + chunk->addr = chunk_bdzc->offset;
> + chunk->req = fuse_dev_find_request(chunk_bdzc->devfd, chunk_bdzc->unique);
> + if (!chunk->req) {
> + res = -ENXIO;
> + goto err_free_data_chunk;
> + }
> + break;
> }
> - BUG_ON(len != 0);
> + default:
> + res = -ENXIO;
> + goto err_free_data_chunk;
> + };
> +
> + kreq_prepare_data_buff(kreq, chunk);
>
> kreq->data_len += chunk->len;
> chunk++;
> @@ -450,7 +511,7 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se
> chunk = &kreq->data_chunks[i];
> if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
> pcs_umem_release(chunk->umem);
> - else
> + else if (chunk->type == KRPC_CHUNK_TYPE_MR)
> pcs_mr_put(chunk->mr);
> }
> pcs_mr_put(kreq->hdr_chunk.mr);
> diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h
> index 8100dfb2629d..33d758e2503e 100644
> --- a/fs/fuse/kio/pcs/pcs_krpc.h
> +++ b/fs/fuse/kio/pcs/pcs_krpc.h
> @@ -13,15 +13,17 @@
> #include "pcs_krpc_prot.h"
> #include "pcs_mr.h"
>
> -struct krpc_chunk {
> - u64 addr;
> - u32 len;
> - u32 type;
> #define KRPC_CHUNK_TYPE_MR 0
> #define KRPC_CHUNK_TYPE_UMEM 1
> +#define KRPC_CHUNK_TYPE_ZC 2
> +struct krpc_chunk {
> + u32 type;
> + u32 len;
> + u64 addr;
> union {
> struct pcs_mr *mr;
> struct pcs_umem *umem;
> + struct fuse_req *req;
> };
> };
>
> diff --git a/fs/fuse/kio/pcs/pcs_krpc_prot.h b/fs/fuse/kio/pcs/pcs_krpc_prot.h
> index 4c5bbf4492d1..2729520a7cd0 100644
> --- a/fs/fuse/kio/pcs/pcs_krpc_prot.h
> +++ b/fs/fuse/kio/pcs/pcs_krpc_prot.h
> @@ -12,10 +12,23 @@
> #define PCS_KRPC_IOC_RECV_MSG _IO(PCS_KRPC_IOC_MAGIC, 11)
> #define PCS_KRPC_IOC_ABORT _IO(PCS_KRPC_IOC_MAGIC, 12)
>
> +#define PCS_KRPC_BUF_TYPE_MR 0
> +#define PCS_KRPC_BUF_TYPE_UMEM 1
> +#define PCS_KRPC_BUF_TYPE_ZC 2
> struct pcs_krpc_buf_desc {
> - u64 addr; /* buf address in userspace. */
> - u32 len; /* size of the buf */
> - u32 mr_id; /* mr id */
> + u32 type;
> + u32 len; /* size of the buf */
> + u64 addr; /* buf address in userspace. */
> + u32 mr_id; /* mr id */
> + u32 rsvd;
> +};
> +
> +struct pcs_krpc_buf_desc_zc {
> + u32 type;
> + u32 len;
> + u64 unique;
> + u32 devfd;
> + u32 offset;
> };
>
> #define PCS_KRPC_MSG_ALIGNMENT (512ULL)
> diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h
> index eaaddd046f59..cb8e063a9a13 100644
> --- a/include/uapi/linux/fuse.h
> +++ b/include/uapi/linux/fuse.h
> @@ -904,6 +904,7 @@ struct fuse_out_header {
>
> /* Special error value for fuse_out_header */
> #define FUSE_OUT_SPLICES 0x40000000
> +#define FUSE_OUT_KRPCZC 0x40000001
>
> struct fuse_dirent {
> uint64_t ino;
> --
> 2.39.3 (Apple Git-146)
More information about the Devel
mailing list