[Devel] [PATCH RHEL9 COMMIT] fs/fuse/kio/krpc: krpc zero copy combined with csaccel
Konstantin Khorenko
khorenko at virtuozzo.com
Thu Jan 23 21:53:25 MSK 2025
The commit is pushed to "branch-rh9-5.14.0-427.44.1.vz9.80.x-ovz" and will appear at git at bitbucket.org:openvz/vzkernel.git
after rh9-5.14.0-427.44.1.vz9.80.4
------>
commit 03122113444f4f8abaae5ceb5491f4a8ba039c03
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date: Sat Jan 18 02:09:23 2025 +0800
fs/fuse/kio/krpc: krpc zero copy combined with csaccel
Bypass local cs when doing reads from raid files.
The feature is smart and powerful, it will allow to reduce
load to CSes down to 0.
For now it is disabled by default since we still have questions
about direct memory access in case of client aborts. Unlikely,
yet we must have zero tolerance to this. Also, base krpc is
not without flaws in any case, and completion of this addon
requires stabilization of krpc itself first.
Signed-off-by: Alexey Kuznetsov <kuznet at virtuozzo.com>
Feature: fuse: kRPC - single RPC for kernel and userspace
---
fs/fuse/kio/pcs/pcs_cluster_core.c | 2 +
fs/fuse/kio/pcs/pcs_cs.h | 2 +
fs/fuse/kio/pcs/pcs_cs_accel.c | 223 +++++++++++++++++++++++++++++++++++++
fs/fuse/kio/pcs/pcs_krpc.c | 88 ++++++++++++++-
fs/fuse/kio/pcs/pcs_krpc.h | 5 +
fs/fuse/kio/pcs/pcs_req.h | 1 +
6 files changed, 318 insertions(+), 3 deletions(-)
diff --git a/fs/fuse/kio/pcs/pcs_cluster_core.c b/fs/fuse/kio/pcs/pcs_cluster_core.c
index 10d61f203916..144e90350eaa 100644
--- a/fs/fuse/kio/pcs/pcs_cluster_core.c
+++ b/fs/fuse/kio/pcs/pcs_cluster_core.c
@@ -198,6 +198,8 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
cc->cfg.iolat_cutoff = PCS_MAX_IO_LATENCY*1000;
cc->cfg.use_unix_socket = 1;
+ cc->dummy_dentry.cluster = cc;
+
TRACE("Ok cc->{ cl_id:" CLUSTER_ID_FMT ", node_id:" NODE_FMT ", f:%x}\n",
CLUSTER_ID_ARGS(cc->eng.cluster_id), NODE_ARGS(cc->eng.local_id),
cc->eng.flags);
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 0cb7d5012252..d2e80fed7882 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -238,5 +238,7 @@ int pcs_cs_dislog_event(struct pcs_cluster_core *cc, struct pcs_mds_cached_event
int pcs_dislog_is_host_down(struct pcs_cs_set *css, PCS_NODE_ID_T host_id);
void pcs_dislog_host_add(struct pcs_cs_set *css, u64 host_id);
void pcs_dislog_host_del(struct pcs_cs_set *css, u64 host_id);
+struct krpc_req;
+extern int pcs_csa_rpc_submit(struct pcs_cs *cs, struct pcs_cs_iohdr *h, struct krpc_req *kreq);
#endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index de39ecf2f974..66a0e0f83bbe 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -736,6 +736,191 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
return 0;
}
+static void csa_rpc_ireq_complete(struct pcs_int_request *ireq)
+{
+ struct krpc_req *kreq = ireq->completion_data.ctx;
+ struct pcs_krpc *krpc = kreq->krpc;
+ struct pcs_cs_iohdr *h;
+
+ if (pcs_if_error(&ireq->error)) {
+ struct pcs_rpc *ep = kreq->msg.rpc;
+
+ if (krpc->state != PCS_KRPC_STATE_CONNECTED) {
+ kreq->msg.error = ireq->error;
+ goto out;
+ }
+
+ ireq_destroy(ireq);
+
+ kreq->msg.rpc = NULL;
+ pcs_krpc_get(krpc);
+ pcs_rpc_queue(ep, &kreq->msg);
+ if (atomic_dec_and_test(&krpc->iocount))
+ wake_up(&krpc->iowait);
+ pcs_rpc_put(ep);
+ pcs_krpc_put(krpc);
+ return;
+ }
+
+ h = (struct pcs_cs_iohdr *)kreq->hdr_buf;
+ h->hdr.type = PCS_CS_READ_RESP;
+ PCS_CS_RESET_TS_RECV(&h->sync, ktime_to_us(ireq->ts));
+ h->sync.misc |= PCS_CS_IO_CLEAR|PCS_CS_IO_SYNC; /* To make it different in logs */
+ h->sync.ts_io = ktime_to_us(ktime_sub(ktime_get(), ireq->ts));
+ h->sync.ts_net = 0;
+
+out:
+ ireq_destroy(ireq);
+
+ pcs_krpc_get(krpc);
+ pcs_krpc_response_done(&kreq->msg);
+ if (atomic_dec_and_test(&krpc->iocount))
+ wake_up(&krpc->iowait);
+ pcs_krpc_put(krpc);
+}
+
+static void csa_rpc_aio_work(struct work_struct *w)
+{
+ struct pcs_int_request *ireq = container_of(w, struct pcs_int_request, worker);
+ struct pcs_aio_req *areq = &ireq->iochunk.ar;
+ struct kiocb *iocb = &areq->iocb;
+ struct iov_iter *it = &areq->iter;
+ int ret;
+
+ ireq->submit_cpu = smp_processor_id();
+
+ ret = call_read_iter(iocb->ki_filp, iocb, it);
+
+ if (unlikely(ret != -EIOCBQUEUED)) {
+ if (ret != ireq->iochunk.size) {
+ FUSE_KTRACE(ireq->cc->fc,
+ "AIO submit rejected ret=%d %lu, ireq:%p : %llu:%u+%u",
+ ret,
+ ireq->error.remote ? (unsigned long)ireq->error.offender.val
+ : 0UL,
+ ireq, (unsigned long long)ireq->iochunk.chunk,
+ (unsigned int)ireq->iochunk.offset,
+ (unsigned int)ireq->iochunk.size);
+ if (ret < 0) {
+ FUSE_KTRACE(ireq->cc->fc, "segs=%d %d %u %u+%u", (int)it->nr_segs,
+ (int)it->count,
+ (unsigned int)it->iov_offset, it->bvec[0].bv_offset,
+ it->bvec[0].bv_len
+ );
+ }
+ pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, (PCS_NODE_ID_T){0});
+ } else {
+ /* IO already finished. Drop AIO refcnt and proceed to crc */
+ FUSE_KTRACE(ireq->cc->fc,
+ "No good, AIO executed synchronously, ireq:%p : %llu:%u+%u",
+ ireq, (unsigned long long)ireq->iochunk.chunk,
+ (unsigned int)ireq->iochunk.offset,
+ (unsigned int)ireq->iochunk.size);
+ }
+
+ BUG_ON(atomic_read(&areq->iocount) <= 0);
+ if (atomic_dec_and_test(&areq->iocount))
+ csa_complete_work(&areq->work);
+ }
+}
+
+static inline int csa_rpc_submit(struct file *file, struct file *cfile, int do_csum,
+ struct pcs_cs_iohdr *h, struct krpc_req *kreq)
+{
+ struct pcs_cluster_core *cc;
+ struct pcs_aio_req *areq;
+ struct kiocb *iocb;
+ struct iov_iter *it;
+ unsigned int size = h->size;
+ struct pcs_int_request *ireq;
+
+ /* XXX: TODO: move to workqueue more code to relieve sending thread */
+
+ ireq = __ireq_alloc();
+ if (!ireq)
+ return -ENOMEM;
+
+ cc = container_of(kreq->krpc->krpcs, struct pcs_cluster_core, krpcs);
+ ireq->cc = cc;
+ INIT_LIST_HEAD(&ireq->list);
+ ireq->dentry = &cc->dummy_dentry;
+ ireq->type = PCS_IREQ_IOCHUNK;
+ pcs_clear_error(&ireq->error);
+ ireq->flags = 0;
+ ireq->ts = ktime_get();
+ ireq->crcr_cpu = 0xFF;
+
+ ireq->completion_data.parent = NULL;
+ ireq->completion_data.ctx = kreq;
+ ireq->completion_data.priv = NULL;
+ INIT_HLIST_HEAD(&ireq->completion_data.child_list);
+ spin_lock_init(&ireq->completion_data.child_lock);
+ ireq->complete_cb = csa_rpc_ireq_complete;
+ INIT_WORK(&ireq->worker, csa_rpc_aio_work);
+ ireq->iochunk.map = NULL;
+ ireq->iochunk.size = h->size;
+ ireq->iochunk.offset = h->offset;
+ ireq->iochunk.csl = NULL;
+
+ areq = &ireq->iochunk.ar;
+ iocb = &areq->iocb;
+ it = &areq->iter;
+
+ areq->cfile = NULL;
+ areq->crc = NULL;
+ areq->crc_page[0] = NULL;
+ areq->crc_page[1] = NULL;
+
+ if (do_csum) {
+ if (cfile == NULL || ((size|ireq->iochunk.offset) & 4095)) {
+ ireq_destroy(ireq);
+ return -EINVAL;
+ }
+
+ quick_crc_fetch(ireq, cfile);
+ }
+
+ iov_iter_bvec(it, READ, &kreq->data_bvecs[0],
+ kreq->nr_data_bvecs, kreq->data_len);
+ iov_iter_truncate(it, size);
+
+ iocb->ki_pos = ireq->iochunk.offset;
+ iocb->ki_filp = get_file(file);
+ iocb->ki_complete = pcs_csa_complete;
+ iocb->ki_flags = IOCB_DIRECT;
+ iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
+
+ /* One ref is ours, other is for AIO. If crc read is needed we will grab the third */
+ atomic_set(&areq->iocount, 2);
+
+ queue_work(ireq->cc->wq, &ireq->worker);
+
+ /* Successful or queued read. Need to start crc read, if it is not ready already */
+ if (do_csum && !areq->crc_page[0]) {
+ FUSE_KDTRACE(ireq->cc->fc, "Not a quicky crc %lu at %u",
+ (unsigned long)cfile, (unsigned int)ireq->iochunk.offset);
+ INIT_WORK(&areq->work, csa_crc_work);
+ /* Grab ref for crc read work */
+ BUG_ON(atomic_read(&areq->iocount) <= 0);
+ atomic_inc(&areq->iocount);
+ areq->cfile = cfile;
+ get_file(cfile);
+ queue_work(ireq->cc->wq, &areq->work);
+ }
+
+ /* Why not pcs_csa_do_completion? Because we do not want to execute real completion
+ * on stack of caller, crypto is a stack hog. Normally, iocount > 1 here, but if all
+ * the IO happen to complete so quickly (or even synchronously) that we are ready already,
+ * it will be the last ref.
+ */
+ BUG_ON(atomic_read(&areq->iocount) <= 0);
+ if (atomic_dec_and_test(&areq->iocount)) {
+ INIT_WORK(&areq->work, csa_complete_work);
+ queue_work(enable_cpu_wq ? pcs_cpu_wq : ireq->cc->wq, &areq->work);
+ }
+ return 0;
+}
+
int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
{
struct pcs_csa_context * csa_ctx;
@@ -779,6 +964,44 @@ int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
return 0;
}
+int pcs_csa_rpc_submit(struct pcs_cs *cs, struct pcs_cs_iohdr *h, struct krpc_req *kreq)
+{
+ struct pcs_csa_context *csa_ctx;
+
+ rcu_read_lock();
+ csa_ctx = rcu_dereference(cs->csa_ctx);
+
+ if (csa_ctx) {
+ struct pcs_csa_entry *csa;
+
+ csa = __cse_lookup(csa_ctx, h->uid);
+ if (csa && !csa_ctx->tfm &&
+ memcmp(&h->map_version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 &&
+ (csa->flags & PCS_CSA_FL_READ)) {
+ struct file *file = get_file(csa->file);
+ struct file *cfile = csa->cfile ? get_file(csa->cfile) : NULL;
+ unsigned int flags = crc_verify ? csa->flags : 0;
+ int err;
+
+ kreq->msg.rpc = pcs_rpc_get(cs->rpc);
+
+ rcu_read_unlock();
+ err = csa_rpc_submit(file, cfile, flags&PCS_CSA_FL_CSUM, h, kreq);
+ fput(file);
+ if (cfile)
+ fput(cfile);
+ if (!err)
+ return 1;
+ rcu_read_lock();
+ /* Clear state which could be rewritten by csa */
+ pcs_rpc_put(kreq->msg.rpc);
+ kreq->msg.rpc = NULL;
+ }
+ }
+ rcu_read_unlock();
+ return 0;
+}
+
/* Write engine. It is similar to read, code could be merged. Actually the situation
* with nsrv=1 is just exactly the same. But yet reads can be optimized a lot better
* and we do not want to lose this advantage.
diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
index 45ec3cb3207c..f52be006b91d 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.c
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -10,6 +10,7 @@
#include <linux/file.h>
#include <linux/anon_inodes.h>
#include <linux/delay.h>
+#include <linux/module.h>
#include "pcs_types.h"
#include "pcs_cluster.h"
@@ -19,6 +20,10 @@
static void kreq_release_data_chunks(struct krpc_req *kreq);
+unsigned int pcs_krpc_csaccel;
+module_param(pcs_krpc_csaccel, uint, 0644);
+MODULE_PARM_DESC(pcs_krpc_csaccel, "Enable krpc local cs bypass");
+
extern unsigned int pcs_krpc_version;
struct kmem_cache *krpc_req_cachep;
@@ -241,7 +246,7 @@ static void krpc_msg_get_data(struct pcs_msg *msg, int offset,
1, (hdr_size + data_size - offset));
}
-static void pcs_krpc_response_done(struct pcs_msg *msg)
+void pcs_krpc_response_done(struct pcs_msg *msg)
{
struct krpc_req *kreq = msg->private2;
@@ -313,7 +318,7 @@ static void kreq_release_data_chunks(struct krpc_req *kreq)
case KRPC_CHUNK_TYPE_MR:
pcs_mr_put(chunk->mr);
break;
- case KRPC_CHUNK_TYPE_ZC:
+ case KRPC_CHUNK_TYPE_ZC: {
int j, end;
end = chunk->bvec_idx_start + chunk->nr_bvecs;
@@ -321,6 +326,7 @@ static void kreq_release_data_chunks(struct krpc_req *kreq)
put_page(kreq->data_bvecs[j].bv_page);
break;
+ }
default:
WARN_ON("Invalid chunk type");
break;
@@ -405,6 +411,48 @@ static void kreq_prepare_data_buff(struct krpc_req *kreq, struct krpc_chunk *chu
BUG_ON(len != 0);
}
+static int try_local_bypass(struct pcs_krpc *krpc, struct krpc_req *kreq)
+{
+ struct pcs_cs_iohdr *h = (struct pcs_cs_iohdr *)kreq->hdr_buf;
+ struct pcs_cs *cs;
+
+ if (!pcs_krpc_csaccel)
+ return 0;
+ if (kreq->hdr_kv.iov_len < sizeof(*h))
+ return 0;
+ if (h->hdr.type != PCS_CS_READ_REQ)
+ return 0;
+ if (!(kreq->flags & KRPC_REQ_F_RESP_BUFF))
+ return 0;
+ if ((h->offset | h->size) & (PAGE_SIZE - 1))
+ return 0;
+
+ cs = krpc->cs;
+ if (cs == NULL) {
+ struct pcs_cluster_core *cc =
+ container_of(krpc->krpcs, struct pcs_cluster_core, krpcs);
+ struct pcs_rpc *ep = krpc->rpc;
+
+ cs = pcs_cs_find_create(&cc->css, &ep->peer_id, &ep->addr, CS_FL_LOCAL_SOCK);
+ if (cs) {
+ if (cs->csa_ctx) {
+ spin_lock(&krpc->lock);
+ if (!krpc->cs) {
+ krpc->cs = cs;
+ cs->nmaps++;
+ }
+ spin_unlock(&krpc->lock);
+ }
+ spin_unlock(&cs->lock);
+ }
+ cs = krpc->cs;
+ }
+
+ if (cs && pcs_csa_rpc_submit(cs, h, kreq))
+ return 1;
+ return 0;
+}
+
static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_sendmsg *iocmsg)
{
struct krpc_req *kreq;
@@ -527,10 +575,24 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se
msg->get_iter = krpc_msg_get_data;
spin_lock(&krpc->lock);
+ if (krpc->state != PCS_KRPC_STATE_CONNECTED) {
+ spin_unlock(&krpc->lock);
+ res = -ECONNABORTED;
+ goto err_free_data_chunk;
+ }
+ atomic_inc(&krpc->iocount);
kreq->krpc = pcs_krpc_get(krpc);
list_add_tail(&kreq->link, &krpc->pending_queue);
spin_unlock(&krpc->lock);
+
/* DTRACE to be added */
+ if (krpc->rpc->flags & PCS_RPC_F_LOCAL)
+ if (try_local_bypass(krpc, kreq))
+ return 0;
+
+ if (atomic_dec_and_test(&krpc->iocount))
+ wake_up(&krpc->iowait);
+
pcs_rpc_queue(krpc->rpc, msg);
return 0;
@@ -563,6 +625,12 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
krpc->state = PCS_KRPC_STATE_ABORTED;
+ if (atomic_read(&krpc->iocount)) {
+ spin_unlock(&krpc->lock);
+ wait_event(krpc->iowait, atomic_read(&krpc->iocount) == 0);
+ spin_lock(&krpc->lock);
+ }
+
/* dispose all unprocessed completions */
while (!list_empty(&krpc->completion_queue)) {
comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link);
@@ -764,10 +832,14 @@ int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
INIT_LIST_HEAD(&krpc->completion_queue);
init_waitqueue_head(&krpc->poll_wait);
+ atomic_set(&krpc->iocount, 0);
+ init_waitqueue_head(&krpc->iowait);
+
krpc->krpcs = krpcs;
krpc->id = *id;
krpc->gen = 0;
krpc->state = PCS_KRPC_STATE_UNCONN;
+ krpc->cs = NULL;
krpc->rpc = pcs_rpc_clnt_create(&cc_from_krpcset(krpcs)->eng, id, addr, cs_flags);
if (!krpc->rpc) {
@@ -908,11 +980,13 @@ int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
static void __pcs_krpc_destroy(struct pcs_krpc *krpc)
{
+ struct pcs_cs *cs = NULL;
+
spin_lock(&krpc->lock);
krpc->state = PCS_KRPC_STATE_DESTROYED;
- /*Remove from krpc set*/
+ /* Remove from krpc set */
spin_lock(&krpc->krpcs->lock);
hlist_del(&krpc->hlist);
list_del(&krpc->link);
@@ -921,6 +995,14 @@ static void __pcs_krpc_destroy(struct pcs_krpc *krpc)
spin_unlock(&krpc->lock);
+ cs = krpc->cs;
+ if (cs != NULL) {
+ krpc->cs = NULL;
+ spin_lock(&cs->lock);
+ cs->nmaps--;
+ spin_unlock(&cs->lock);
+ }
+
if (krpc->rpc) {
krpc->rpc->clnt_krpc = NULL;
pcs_rpc_close(krpc->rpc);
diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h
index 85ddbe1b713c..c6b867b5fa75 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.h
+++ b/fs/fuse/kio/pcs/pcs_krpc.h
@@ -73,8 +73,12 @@ struct pcs_krpc {
struct list_head completion_queue;
int nr_completion;
+ atomic_t iocount;
+ wait_queue_head_t iowait;
+
/** Wait queue head for poll */
wait_queue_head_t poll_wait;
+ struct pcs_cs *cs;
};
struct pcs_krpc_context {
@@ -154,4 +158,5 @@ struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc);
struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg);
+void pcs_krpc_response_done(struct pcs_msg *msg);
#endif
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 09eeb9d1a7d8..94fcfdbc7936 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -323,6 +323,7 @@ struct pcs_cluster_core
char nilbuffer[PCS_CS_MSG_ALIGNMENT];
struct kvec nilbuffer_kv;
+ struct pcs_dentry_info dummy_dentry;
};
static inline struct pcs_cluster_core *cc_from_csset(struct pcs_cs_set * css)
More information about the Devel
mailing list