[Devel] [PATCH VZ9 20/20] fuse: scalable queue limiting
Alexey Kuznetsov
kuznet at virtuozzo.com
Fri Oct 6 13:44:22 MSK 2023
This is missing element in previous scalability patch.
We removes any limits on direct io submitted to cluster there,
which is not right thing to do.
The problem is not trivial. Experiments show we cannot do _any_ shared
spinlock in this path, even empty lock-unlock added there reduces
performance twice! So, we have to come with scalable solution
not using locks. Several approaches were tried, suggested one
looks the best.
The idea is to create hash table of counters and wait queues,
sending thread mapped to a hash bucket based on its pid.
We limit number of pending requests in each bucket by
max_background / HASH_SIZE. It works surprizingly well,
almost reaches results with unlimited queue.
Flaw: in case of small number of threads and large aio queues
or when cluster latency is high effective per-thread limit can
be too small. If we see a problem we have to come out with some
solution, yet it must be free of global locks. I used to insist
on this simple axiom, but not very aggessively, because actual
observed bad effect was small. Now we see the sutuation when
it is really large, 50% of loss, and we cannot unsee this.
Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
---
fs/fuse/dev.c | 27 +++++++++++++++++++++++++--
fs/fuse/fuse_i.h | 27 +++++++++++++++++++++++++++
fs/fuse/inode.c | 9 ++++++++-
3 files changed, 60 insertions(+), 3 deletions(-)
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 592cb3a..15fec87b 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -362,9 +362,17 @@ void __fuse_request_end( struct fuse_req *req, bool flush_bg)
flush_bg_queue_and_unlock(fc);
else
spin_unlock(&fc->bg_lock);
- } else if (test_bit(FR_NO_ACCT, &req->flags))
+ } else if (test_bit(FR_NO_ACCT, &req->flags)) {
+ unsigned int bkt = req->qhash;
+
bg = true;
+ if (atomic_dec_return(&fc->qhash[bkt].num_reqs) < ((2*fc->max_background) / FUSE_QHASH_SIZE)) {
+ if (waitqueue_active(&fc->qhash[bkt].waitq))
+ wake_up(&fc->qhash[bkt].waitq);
+ }
+ }
+
if (test_bit(FR_ASYNC, &req->flags)) {
req->args->end(fm, req->args, req->out.h.error);
if (!bg)
@@ -613,12 +621,25 @@ static int fuse_request_queue_background(struct fuse_req *req)
}
__set_bit(FR_ISREPLY, &req->flags);
- if (fc->kio.op && req->args->async && !nonblocking &&
+ if (fc->kio.op && req->args->async && !nonblocking && READ_ONCE(fc->connected) &&
(!ff || !test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state))) {
int ret = fc->kio.op->req_classify(req, false, false);
if (likely(!ret)) {
+ unsigned int bkt = fuse_qhash_bucket();
__clear_bit(FR_BACKGROUND, &req->flags);
__set_bit(FR_NO_ACCT, &req->flags);
+ if (wait_event_killable_exclusive(fc->qhash[bkt].waitq,
+ (atomic_read(&fc->qhash[bkt].num_reqs) <
+ ((2 * fc->max_background) / FUSE_QHASH_SIZE) ||
+ !READ_ONCE(fc->connected) ||
+ (ff && test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state)))))
+ return -EIO;
+ if (!READ_ONCE(fc->connected))
+ return -ENOTCONN;
+ if (ff && test_bit(FUSE_S_FAIL_IMMEDIATELY, &ff->ff_state))
+ return -EIO;
+ req->qhash = bkt;
+ atomic_inc(&fc->qhash[bkt].num_reqs);
fc->kio.op->req_send(req, true);
return 0;
} else if (ret < 0)
@@ -2323,6 +2344,8 @@ void fuse_abort_conn(struct fuse_conn *fc)
end_polls(fc);
wake_up_all(&fc->blocked_waitq);
+ for (cpu = 0; cpu < FUSE_QHASH_SIZE; cpu++)
+ wake_up_all(&fc->qhash[cpu].waitq);
spin_unlock(&fc->lock);
end_requests(&to_end);
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index d9e27b3..487e112 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -435,6 +435,8 @@ struct fuse_req {
/* Request flags, updated with test/set/clear_bit() */
unsigned long flags;
+ unsigned int qhash;
+
/* The request input header */
struct {
struct fuse_in_header h;
@@ -640,6 +642,29 @@ struct fuse_kio_ops {
int fuse_register_kio(struct fuse_kio_ops *ops);
void fuse_unregister_kio(struct fuse_kio_ops *ops);
+#define FUSE_QHASH_SIZE 64
+
+#include <linux/jhash.h>
+
+struct fuse_qhash_queue
+{
+ atomic_t num_reqs;
+ wait_queue_head_t waitq;
+} ____cacheline_aligned_in_smp;
+
+#if 0
+static inline unsigned int fuse_qhash_bucket(struct fuse_args * args)
+{
+ unsigned long val = (unsigned long)args;
+ return jhash_2words(val & 0xFFFFFFFFU, val >> 32, 0) & (FUSE_QHASH_SIZE - 1);
+}
+#else
+static inline unsigned int fuse_qhash_bucket(void)
+{
+ return jhash_1word(current->pid, 0) & (FUSE_QHASH_SIZE - 1);
+}
+#endif
+
/**
* A Fuse connection.
*
@@ -726,6 +751,8 @@ struct fuse_conn {
/** waitq for blocked connection */
wait_queue_head_t blocked_waitq;
+ struct fuse_qhash_queue qhash[FUSE_QHASH_SIZE];
+
/** Connection established, cleared on umount, connection
abort and device release */
unsigned connected;
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index 1eb64b1..33cd8f9 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -498,7 +498,7 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid)
struct inode *inode;
struct fuse_inode *fi;
struct fuse_file *ff;
- int err;
+ int err, i;
if (!fc->async_read) {
printk(KERN_ERR "Turn async_read ON to use "
@@ -523,6 +523,9 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid)
/* let them see FUSE_S_FAIL_IMMEDIATELY */
wake_up_all(&fc->blocked_waitq);
+ for (i = 0; i < FUSE_QHASH_SIZE; i++)
+ wake_up_all(&fc->qhash[i].waitq);
+
err = filemap_write_and_wait(inode->i_mapping);
if (!err || err == -EIO) { /* AS_EIO might trigger -EIO */
struct fuse_dev *fud;
@@ -975,6 +978,10 @@ int fuse_conn_init(struct fuse_conn *fc, struct fuse_mount *fm,
refcount_set(&fc->count, 1);
atomic_set(&fc->dev_count, 1);
init_waitqueue_head(&fc->blocked_waitq);
+ for (cpu = 0; cpu < FUSE_QHASH_SIZE; cpu++) {
+ atomic_set(&fc->qhash[cpu].num_reqs, 0);
+ init_waitqueue_head(&fc->qhash[cpu].waitq);
+ }
fuse_iqueue_init(&fc->main_iq, fiq_ops, fiq_priv);
fc->iqs = alloc_percpu(struct fuse_iqueue);
if (!fc->iqs)
--
1.8.3.1
More information about the Devel
mailing list