[Devel] [PATCH RHEL9 COMMIT] fs/fuse kio: implement zerocopy read with krpc

Konstantin Khorenko khorenko at virtuozzo.com
Mon Sep 30 18:05:12 MSK 2024


The commit is pushed to "branch-rh9-5.14.0-427.35.1.vz9.76.x-ovz" and will appear at git at bitbucket.org:openvz/vzkernel.git
after rh9-5.14.0-427.35.1.vz9.76.3
------>
commit 0e64d45fe53c4201218297109b230a3b7e21c894
Author: Liu Kui <kui.liu at virtuozzo.com>
Date:   Wed Sep 25 14:48:28 2024 +0800

    fs/fuse kio: implement zerocopy read with krpc
    
    For FUSE_READ, original page vectors can be found from fuse core by
    fuse request sequence number and device id. When a read request rpc
    msg is sent via krpc, instead of a userspace buffer, userspace passes
    fuse request sequence number, device id and offset in original fuse
    request to krpc, so that krpc can direclty use page vectors from fuse
    request as buffer for read response, achieving zerocopy - from network
    to end buffer directly.
    
    Since it's possible that some parts of fuse request do not come from
    network (not zerocopied via krpc), existing fuse reply is extended to
    handle such case with special error code FUSE_OUT_KRPCZC and metadata
    is returned instead of data. Metadata is a list of chunk descriptors.
    A chunk descriptor tells whether data needs to be copied from userspace
    or has already been zerocopied into fuse request buffer, for a section
    of fuse request.
    
    https://virtuozzo.atlassian.net/browse/VSTOR-90183
    
    Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
    Acked-by: Alexey Kuznetsov <kuznet at virtuozzo.com>
---
 fs/fuse/dev.c                   | 129 +++++++++++++++++++++++++++++++++++-
 fs/fuse/fuse_i.h                |   8 +++
 fs/fuse/kio/pcs/pcs_krpc.c      | 141 ++++++++++++++++++++++++++++------------
 fs/fuse/kio/pcs/pcs_krpc.h      |  10 +--
 fs/fuse/kio/pcs/pcs_krpc_prot.h |  19 +++++-
 include/uapi/linux/fuse.h       |   1 +
 6 files changed, 260 insertions(+), 48 deletions(-)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 01df9d4e4e70..f5594e49bc2f 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2144,6 +2144,112 @@ static int copy_out_splices(struct fuse_copy_state *cs, struct fuse_args *args,
 	goto out;
 }
 
+static int copy_out_krpczc(struct fuse_copy_state *cs, struct fuse_args *args,
+			    unsigned int nbytes)
+{
+	struct fuse_args_pages *ap = container_of(args, typeof(*ap), args);
+	struct pcs_krpczc_chunk chunkarr_inline[8];
+	struct pcs_krpczc_chunk *chunkarr, *chunk;
+	int nchunks, i, didx;
+	unsigned int soff, doff, dend, poff;
+	void *dst;
+	int err;
+
+	nchunks = nbytes - sizeof(struct fuse_out_header);
+
+	/* this is always at least one chunk */
+	if (nchunks ==  0)
+		return -EINVAL;
+
+	if (nchunks % sizeof(struct pcs_krpczc_chunk))
+		return -EINVAL;
+
+	if (nchunks > sizeof(chunkarr_inline)) {
+		chunkarr = kmalloc(nchunks, GFP_KERNEL);
+		if (!chunkarr)
+			return -ENOMEM;
+	} else
+		chunkarr = chunkarr_inline;
+
+	err = fuse_copy_one(cs, chunkarr, nchunks);
+	if (err)
+		goto out;
+
+	fuse_copy_finish(cs);
+
+	nchunks /= sizeof(struct pcs_krpczc_chunk);
+
+	soff = 0;
+	poff = 0;
+	didx = 0;
+	doff = ap->descs[didx].offset;
+	dend = doff + ap->descs[didx].length;
+	for (i = 0; i < nchunks; i++) {
+		void __user *src;
+		unsigned int ilen, copy;
+
+		chunk = &chunkarr[i];
+		src = (void __user *)chunk->addr;
+		ilen = chunk->len;
+
+		while (ilen) {
+			copy = min(ilen, dend - doff);
+
+			/* copy data from userspace buff, otherwise data had been zero-copied */
+			if (src) {
+				dst = kmap_atomic(ap->pages[didx]);
+				if (copy_from_user(dst + doff, src, copy)) {
+					kunmap_atomic(dst);
+					err = -EFAULT;
+					goto out;
+				}
+				kunmap_atomic(dst);
+
+				src += copy;
+			}
+
+			doff += copy;
+			ilen -= copy;
+
+			if (ilen) {
+				/* move to next page */
+				poff += ap->descs[didx].length;
+				didx++;
+				if (didx >= ap->num_pages)
+					goto out_overflow;
+
+				doff = ap->descs[didx].offset;
+				dend = doff + ap->descs[didx].length;
+			}
+		}
+		soff += chunk->len;
+	}
+
+	/* zero the remaining */
+	if (args->page_zeroing && didx < ap->num_pages) {
+		if (doff < dend) {
+			dst = kmap_atomic(ap->pages[didx]);
+			memset(dst + doff, 0, dend - doff);
+			kunmap_atomic(dst);
+		}
+
+		for (didx++; didx < ap->num_pages; didx++) {
+			dst = kmap_atomic(ap->pages[didx]);
+			memset(dst + ap->descs[didx].offset, 0, ap->descs[didx].length);
+			kunmap_atomic(dst);
+		}
+	}
+
+	err = 0;
+out:
+	if (chunkarr != chunkarr_inline)
+		kfree(chunkarr);
+	return err;
+
+out_overflow:
+	err = -EMSGSIZE;
+	goto out;
+}
 /*
  * Write a single reply to a request.  First the header is copied from
  * the write buffer.  The request is then searched on the processing
@@ -2183,6 +2289,7 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
 
 	err = -EINVAL;
 	if (oh.error != FUSE_OUT_SPLICES &&
+		oh.error != FUSE_OUT_KRPCZC &&
 	    (oh.error <= -512 || oh.error > 0))
 		goto copy_finish;
 
@@ -2215,7 +2322,8 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
 		goto copy_finish;
 	}
 
-	if (oh.error == FUSE_OUT_SPLICES) {
+	if (oh.error == FUSE_OUT_SPLICES &&
+		oh.error == FUSE_OUT_KRPCZC) {
 		if (req->args->out_numargs != 1 || !req->args->out_pages) {
 			spin_unlock(&fpq->lock);
 			err = -EINVAL;
@@ -2235,6 +2343,9 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
 	if (oh.error == FUSE_OUT_SPLICES) {
 		req->out.h.error = 0;
 		err = copy_out_splices(cs, req->args, nbytes);
+	} else if (oh.error == FUSE_OUT_KRPCZC) {
+		req->out.h.error = 0;
+		err = copy_out_krpczc(cs, req->args, nbytes);
 	} else if (oh.error)
 		err = nbytes != sizeof(oh) ? -EINVAL : 0;
 	else
@@ -2565,6 +2676,22 @@ int fuse_dev_release(struct inode *inode, struct file *file)
 }
 EXPORT_SYMBOL_GPL(fuse_dev_release);
 
+struct fuse_req *fuse_dev_find_request(int fd, u64 unique)
+{
+	struct file *f = fget(fd);
+	struct fuse_dev *fud = fuse_get_dev(f);
+	struct fuse_pqueue *fpq = &fud->pq;
+	struct fuse_req *req = NULL;
+
+	spin_lock(&fpq->lock);
+	if (fpq->connected)
+		req = request_find(&fud->pq, unique);
+	spin_unlock(&fpq->lock);
+
+	return req;
+}
+EXPORT_SYMBOL_GPL(fuse_dev_find_request);
+
 static int fuse_dev_fasync(int fd, struct file *file, int on)
 {
 	struct fuse_dev *fud = fuse_get_dev(file);
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index 090871a1e356..35488283cea3 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -654,6 +654,12 @@ struct fuse_kio_ops {
 int fuse_register_kio(struct fuse_kio_ops *ops);
 void fuse_unregister_kio(struct fuse_kio_ops *ops);
 
+struct pcs_krpczc_chunk {
+	u64 addr;
+	u32 offset;
+	u32 len;
+};
+
 #define FUSE_QHASH_SIZE 128
 
 #include <linux/jhash.h>
@@ -1551,6 +1557,8 @@ void fuse_release_ff(struct inode *inode, struct fuse_file *ff);
 void fuse_kill_requests(struct fuse_conn *fc, struct inode *inode,
 			struct list_head *req_list);
 
+struct fuse_req *fuse_dev_find_request(int fd, u64 unique);
+
 /* readdir.c */
 int fuse_readdir(struct file *file, struct dir_context *ctx);
 
diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
index 0ef33b730204..8f62f12782e5 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.c
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -2,6 +2,8 @@
  * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved.
  */
 
+#include "../../fuse_i.h"
+
 #include <linux/types.h>
 #include <linux/list.h>
 #include <linux/refcount.h>
@@ -79,7 +81,7 @@ static void krpc_req_complete(struct krpc_req *kreq, int error)
 		chunk = &kreq->data_chunks[i];
 		if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
 			pcs_umem_release(chunk->umem);
-		else
+		else if (chunk->type == KRPC_CHUNK_TYPE_MR)
 			pcs_mr_put(chunk->mr);
 	}
 
@@ -305,6 +307,76 @@ static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_re
 	return res;
 }
 
+
+static void kreq_prepare_data_buff(struct krpc_req *kreq, struct krpc_chunk *chunk)
+{
+	struct page *page;
+	u64 pos;
+	int offset, len;
+	struct bio_vec *bvec;
+
+	pos = chunk->addr;		// offset in request
+	len = chunk->len;
+
+	/* build the bvecs for the data chunk*/
+	if (chunk->type == KRPC_CHUNK_TYPE_ZC) {
+		struct fuse_args *args = chunk->req->args;
+		struct fuse_io_args *ia = container_of(args, typeof(*ia), ap.args);
+		int i = 0;
+
+		/* locate the first page */
+		while (i < ia->ap.num_pages) {
+			if (pos < ia->ap.descs[i].length)
+				break;
+			pos -= ia->ap.descs[i].length;
+			i++;
+		}
+
+		offset = pos;
+		while (len) {
+			/* data bvec array overflow? */
+			BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
+
+			bvec = &kreq->data_bvecs[kreq->nr_data_bvecs];
+
+			bvec->bv_page = ia->ap.pages[i];
+			bvec->bv_offset = ia->ap.descs[i].offset + offset;
+			bvec->bv_len = len < (ia->ap.descs[i].length - offset) ? len :
+					(ia->ap.descs[i].length - offset);
+
+			len -= bvec->bv_len;
+			kreq->nr_data_bvecs++;
+			i++;
+			offset = 0;
+		}
+	} else {
+		struct pcs_umem *umem;
+
+		umem = (chunk->type == KRPC_CHUNK_TYPE_MR) ? chunk->mr->umem : chunk->umem;
+
+		while (len) {
+			/* data bvec array overflow? */
+			BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
+
+			bvec = &kreq->data_bvecs[kreq->nr_data_bvecs];
+
+			offset = pos & (PAGE_SIZE - 1);
+			page = pcs_umem_page(umem, pos);
+			BUG_ON(!page);
+
+			bvec->bv_page = page;
+			bvec->bv_offset = offset;
+
+			bvec->bv_len = len < (PAGE_SIZE - offset) ? len : (PAGE_SIZE - offset);
+
+			pos += bvec->bv_len;
+			len -= bvec->bv_len;
+			kreq->nr_data_bvecs++;
+		}
+	}
+	BUG_ON(len != 0);
+}
+
 static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_sendmsg *iocmsg)
 {
 	struct krpc_req *kreq;
@@ -358,58 +430,47 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se
 	bvec = &kreq->data_bvecs[0];
 
 	for (i = 0; i < iocmsg->nr_data_chunks; i++) {
-		struct page *page;
-		u64 addr;
-		int offset, len;
-
-		chunk->addr = chunk_bd->addr;
 		chunk->len = chunk_bd->len;
-		if (chunk_bd->mr_id) {
-			chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, chunk_bd->mr_id);
+
+		switch (chunk_bd->type) {
+		case PCS_KRPC_BUF_TYPE_MR: {
 			chunk->type = KRPC_CHUNK_TYPE_MR;
+			chunk->addr = chunk_bd->addr;
+			chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, chunk_bd->mr_id);
 			if (!chunk->mr) {
 				res = -ENXIO;
 				goto err_free_data_chunk;
 			}
-		} else {
-			/* unregistered memory buf */
+			break;
+		}
+		case PCS_KRPC_BUF_TYPE_UMEM: {
+			chunk->type = KRPC_CHUNK_TYPE_UMEM;
+			chunk->addr = chunk_bd->addr;
 			chunk->umem = pcs_umem_get(chunk->addr, chunk->len);
 			if (IS_ERR(chunk->umem)) {
 				res = PTR_ERR(chunk->umem);
 				goto err_free_data_chunk;
 			}
-
-			chunk->type = KRPC_CHUNK_TYPE_UMEM;
+			break;
 		}
+		case PCS_KRPC_BUF_TYPE_ZC: {
+			struct pcs_krpc_buf_desc_zc *chunk_bdzc = (struct pcs_krpc_buf_desc_zc *)chunk_bd;
 
-		/* build the bvecs for the data chunk*/
-		addr = chunk->addr;
-		len = chunk->len;
-
-		while (len) {
-			/* data bvec array overflow? */
-			BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
-
-			if (chunk->type == KRPC_CHUNK_TYPE_MR)
-				page = pcs_umem_page(chunk->mr->umem, addr);
-			else
-				page = pcs_umem_page(chunk->umem, addr);
-
-			BUG_ON(!page);
-
-			offset = addr & (PAGE_SIZE - 1);
-
-			bvec->bv_page = page;
-			bvec->bv_offset = offset;
-
-			bvec->bv_len = len < (PAGE_SIZE - offset) ? len : (PAGE_SIZE - offset);
-
-			addr += bvec->bv_len;
-			len -= bvec->bv_len;
-			bvec++;
-			kreq->nr_data_bvecs++;
+			chunk->type = KRPC_CHUNK_TYPE_ZC;
+			chunk->addr = chunk_bdzc->offset;
+			chunk->req = fuse_dev_find_request(chunk_bdzc->devfd, chunk_bdzc->unique);
+			if (!chunk->req) {
+				res = -ENXIO;
+				goto err_free_data_chunk;
+			}
+			break;
 		}
-		BUG_ON(len != 0);
+		default:
+			res = -ENXIO;
+			goto err_free_data_chunk;
+		};
+
+		kreq_prepare_data_buff(kreq, chunk);
 
 		kreq->data_len += chunk->len;
 		chunk++;
@@ -450,7 +511,7 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se
 		chunk = &kreq->data_chunks[i];
 		if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
 			pcs_umem_release(chunk->umem);
-		else
+		else if (chunk->type == KRPC_CHUNK_TYPE_MR)
 			pcs_mr_put(chunk->mr);
 	}
 	pcs_mr_put(kreq->hdr_chunk.mr);
diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h
index 8100dfb2629d..33d758e2503e 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.h
+++ b/fs/fuse/kio/pcs/pcs_krpc.h
@@ -13,15 +13,17 @@
 #include "pcs_krpc_prot.h"
 #include "pcs_mr.h"
 
-struct krpc_chunk {
-	u64		addr;
-	u32		len;
-	u32		type;
 #define KRPC_CHUNK_TYPE_MR		0
 #define KRPC_CHUNK_TYPE_UMEM	1
+#define KRPC_CHUNK_TYPE_ZC		2
+struct krpc_chunk {
+	u32		type;
+	u32		len;
+	u64		addr;
 	union {
 		struct pcs_mr *mr;
 		struct pcs_umem *umem;
+		struct fuse_req *req;
 	};
 };
 
diff --git a/fs/fuse/kio/pcs/pcs_krpc_prot.h b/fs/fuse/kio/pcs/pcs_krpc_prot.h
index 4c5bbf4492d1..2729520a7cd0 100644
--- a/fs/fuse/kio/pcs/pcs_krpc_prot.h
+++ b/fs/fuse/kio/pcs/pcs_krpc_prot.h
@@ -12,10 +12,23 @@
 #define PCS_KRPC_IOC_RECV_MSG	_IO(PCS_KRPC_IOC_MAGIC, 11)
 #define PCS_KRPC_IOC_ABORT		_IO(PCS_KRPC_IOC_MAGIC, 12)
 
+#define PCS_KRPC_BUF_TYPE_MR	0
+#define PCS_KRPC_BUF_TYPE_UMEM	1
+#define PCS_KRPC_BUF_TYPE_ZC	2
 struct pcs_krpc_buf_desc {
-	u64  addr;		/* buf address in userspace. */
-	u32  len;		/* size of the buf */
-	u32  mr_id;		/* mr id */
+	u32 type;
+	u32 len;		/* size of the buf */
+	u64 addr;		/* buf address in userspace. */
+	u32 mr_id;		/* mr id */
+	u32 rsvd;
+};
+
+struct pcs_krpc_buf_desc_zc {
+	u32 type;
+	u32 len;
+	u64 unique;
+	u32 devfd;
+	u32 offset;
 };
 
 #define PCS_KRPC_MSG_ALIGNMENT		(512ULL)
diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h
index eaaddd046f59..cb8e063a9a13 100644
--- a/include/uapi/linux/fuse.h
+++ b/include/uapi/linux/fuse.h
@@ -904,6 +904,7 @@ struct fuse_out_header {
 
 /* Special error value for fuse_out_header */
 #define FUSE_OUT_SPLICES	0x40000000
+#define FUSE_OUT_KRPCZC	0x40000001
 
 struct fuse_dirent {
 	uint64_t	ino;


More information about the Devel mailing list