[Devel] [PATCH RHEL9 COMMIT] fuse: scalable queue limiting

Konstantin Khorenko khorenko at virtuozzo.com
Wed Nov 1 22:53:43 MSK 2023


The commit is pushed to "branch-rh9-5.14.0-284.25.1.vz9.30.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh9-5.14.0-284.25.1.vz9.30.8
------>
commit 81e667c754a6120d994c755443d7291e0a8d6c4f
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date:   Fri Oct 6 18:44:22 2023 +0800

    fuse: scalable queue limiting
    
    This is missing element in previous scalability patch.
    We removes any limits on direct io submitted to cluster there,
    which is not right thing to do.
    
    The problem is not trivial. Experiments show we cannot do _any_ shared
    spinlock in this path, even empty lock-unlock added there reduces
    performance twice! So, we have to come with scalable solution
    not using locks. Several approaches were tried, suggested one
    looks the best.
    
    The idea is to create hash table of counters and wait queues,
    sending thread mapped to a hash bucket based on its pid.
    We limit number of pending requests in each bucket by
    max_background / HASH_SIZE. It works surprizingly well,
    almost reaches results with unlimited queue.
    
    Flaw: in case of small number of threads and large aio queues
    or when cluster latency is high effective per-thread limit can
    be too small. If we see a problem we have to come out with some
    solution, yet it must be free of global locks. I used to insist
    on this simple axiom, but not very aggessively, because actual
    observed bad effect was small. Now we see the sutuation when
    it is really large, 50% of loss, and we cannot unsee this.
    
    https://pmc.acronis.work/browse/VSTOR-54040
    
    Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
    
    Feature: vStorage
---
 fs/fuse/dev.c    | 27 +++++++++++++++++++++++++--
 fs/fuse/fuse_i.h | 27 +++++++++++++++++++++++++++
 fs/fuse/inode.c  |  9 ++++++++-
 3 files changed, 60 insertions(+), 3 deletions(-)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index b93d77af2d24..0023acc35204 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -362,9 +362,17 @@ void __fuse_request_end( struct fuse_req *req, bool flush_bg)
 			flush_bg_queue_and_unlock(fc);
 		else
 			spin_unlock(&fc->bg_lock);
-	} else if (test_bit(FR_NO_ACCT, &req->flags))
+	} else if (test_bit(FR_NO_ACCT, &req->flags)) {
+		unsigned int bkt = req->qhash;
+
 		bg = true;
 
+		if (atomic_dec_return(&fc->qhash[bkt].num_reqs) < ((2*fc->max_background) / FUSE_QHASH_SIZE)) {
+			if (waitqueue_active(&fc->qhash[bkt].waitq))
+				wake_up(&fc->qhash[bkt].waitq);
+		}
+	}
+
 	if (test_bit(FR_ASYNC, &req->flags)) {
 		req->args->end(fm, req->args, req->out.h.error);
 		if (!bg)
@@ -613,12 +621,25 @@ static int fuse_request_queue_background(struct fuse_req *req)
 	}
 	__set_bit(FR_ISREPLY, &req->flags);
 
-	if (fc->kio.op && req->args->async && !nonblocking &&
+	if (fc->kio.op && req->args->async && !nonblocking && READ_ONCE(fc->connected) &&
 	    (!ff || !test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state))) {
 		int ret = fc->kio.op->req_classify(req, false, false);
 		if (likely(!ret)) {
+			unsigned int bkt = fuse_qhash_bucket();
 			__clear_bit(FR_BACKGROUND, &req->flags);
 			__set_bit(FR_NO_ACCT, &req->flags);
+			if (wait_event_killable_exclusive(fc->qhash[bkt].waitq,
+							  (atomic_read(&fc->qhash[bkt].num_reqs) <
+							   ((2 * fc->max_background) / FUSE_QHASH_SIZE) ||
+							   !READ_ONCE(fc->connected) ||
+							   (ff && test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state)))))
+				return -EIO;
+			if (!READ_ONCE(fc->connected))
+				return -ENOTCONN;
+			if (ff && test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state))
+				return -EIO;
+			req->qhash = bkt;
+			atomic_inc(&fc->qhash[bkt].num_reqs);
 			fc->kio.op->req_send(req, true);
 			return 0;
 		} else if (ret < 0)
@@ -2323,6 +2344,8 @@ void fuse_abort_conn(struct fuse_conn *fc)
 
 		end_polls(fc);
 		wake_up_all(&fc->blocked_waitq);
+		for (cpu = 0; cpu < FUSE_QHASH_SIZE; cpu++)
+			wake_up_all(&fc->qhash[cpu].waitq);
 		spin_unlock(&fc->lock);
 
 		end_requests(&to_end);
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index d9e27b36784a..487e1125f7e7 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -435,6 +435,8 @@ struct fuse_req {
 	/* Request flags, updated with test/set/clear_bit() */
 	unsigned long flags;
 
+	unsigned int qhash;
+
 	/* The request input header */
 	struct {
 		struct fuse_in_header h;
@@ -640,6 +642,29 @@ struct fuse_kio_ops {
 int fuse_register_kio(struct fuse_kio_ops *ops);
 void fuse_unregister_kio(struct fuse_kio_ops *ops);
 
+#define FUSE_QHASH_SIZE 64
+
+#include <linux/jhash.h>
+
+struct fuse_qhash_queue
+{
+	atomic_t		num_reqs;
+	wait_queue_head_t	waitq;
+} ____cacheline_aligned_in_smp;
+
+#if 0
+static inline unsigned int fuse_qhash_bucket(struct fuse_args * args)
+{
+	unsigned long val = (unsigned long)args;
+	return jhash_2words(val & 0xFFFFFFFFU, val >> 32, 0) & (FUSE_QHASH_SIZE - 1);
+}
+#else
+static inline unsigned int fuse_qhash_bucket(void)
+{
+	return jhash_1word(current->pid, 0) & (FUSE_QHASH_SIZE - 1);
+}
+#endif
+
 /**
  * A Fuse connection.
  *
@@ -726,6 +751,8 @@ struct fuse_conn {
 	/** waitq for blocked connection */
 	wait_queue_head_t blocked_waitq;
 
+	struct fuse_qhash_queue	qhash[FUSE_QHASH_SIZE];
+
 	/** Connection established, cleared on umount, connection
 	    abort and device release */
 	unsigned connected;
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index 1eb64b13d508..33cd8f9944ca 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -498,7 +498,7 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid)
 	struct inode *inode;
 	struct fuse_inode *fi;
 	struct fuse_file *ff;
-	int err;
+	int err, i;
 
 	if (!fc->async_read) {
 		printk(KERN_ERR "Turn async_read ON to use "
@@ -523,6 +523,9 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid)
 	/* let them see FUSE_S_FAIL_IMMEDIATELY */
 	wake_up_all(&fc->blocked_waitq);
 
+	for (i = 0; i < FUSE_QHASH_SIZE; i++)
+		wake_up_all(&fc->qhash[i].waitq);
+
 	err = filemap_write_and_wait(inode->i_mapping);
 	if (!err || err == -EIO) { /* AS_EIO might trigger -EIO */
 		struct fuse_dev *fud;
@@ -975,6 +978,10 @@ int fuse_conn_init(struct fuse_conn *fc, struct fuse_mount *fm,
 	refcount_set(&fc->count, 1);
 	atomic_set(&fc->dev_count, 1);
 	init_waitqueue_head(&fc->blocked_waitq);
+	for (cpu = 0; cpu < FUSE_QHASH_SIZE; cpu++) {
+		atomic_set(&fc->qhash[cpu].num_reqs, 0);
+		init_waitqueue_head(&fc->qhash[cpu].waitq);
+	}
 	fuse_iqueue_init(&fc->main_iq, fiq_ops, fiq_priv);
 	fc->iqs = alloc_percpu(struct fuse_iqueue);
 	if (!fc->iqs)


More information about the Devel mailing list