[Devel] [PATCH VZ9 1/3] fs/fuse: fuse queue routing

Alexey Kuznetsov kuznet at virtuozzo.com
Tue Jan 9 16:35:40 MSK 2024


Generic fuse multiqueue support. It improves previously existing per-cpu
routing and makes it extensible. At the moment three routing tactics
are implemented and tested:

1. Old per-cpu routing. Deprecated, but left for performance comparisons.
   Also it still can be good in some situations.
2. Size buckets to support large fuse writes. Userspace selects it as
   default for fuse writes.
3. Hash table by inode. Userspace selects it as default for fuse reads.

Most likely we will need more modes or to modify existing ones
to handle various scenarios, which will emerge during testing.
F.e. the case of small 4k random IO is still not investigated.

Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
---
 fs/fuse/dev.c             |  50 +++++++++---
 fs/fuse/file.c            |  49 +++++++++---
 fs/fuse/fuse_i.h          |  21 +++++-
 fs/fuse/inode.c           | 189 ++++++++++++++++++++++++++++++++++++++--------
 include/uapi/linux/fuse.h |  18 +++++
 5 files changed, 271 insertions(+), 56 deletions(-)

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index ce5e72c..59b9465 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -1352,7 +1352,7 @@ static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file,
 	if (nbytes < max_t(size_t, FUSE_MIN_READ_BUFFER,
 			   sizeof(struct fuse_in_header) +
 			   sizeof(struct fuse_write_in) +
-			   fc->max_write))
+			   (fiq->size ? : fc->max_write)))
 		return -EINVAL;
 
  restart:
@@ -2273,6 +2273,22 @@ void fuse_abort_iqueue(struct fuse_iqueue *fiq, struct list_head *to_end)
 	kill_fasync(&fiq->fasync, SIGIO, POLL_IN);
 }
 
+static void fuse_abort_routing(struct fuse_rtable *rt, struct list_head *to_end)
+{
+	if (rt->type == FUSE_ROUTING_CPU) {
+		int cpu;
+
+		for_each_online_cpu(cpu) {
+			fuse_abort_iqueue(per_cpu_ptr(rt->iqs_cpu, cpu), to_end);
+		}
+	} else if (rt->type == FUSE_ROUTING_SIZE || rt->type == FUSE_ROUTING_HASH) {
+		int i;
+
+		for (i = 0; i < rt->rt_size; i++)
+			fuse_abort_iqueue(rt->iqs_table + i, to_end);
+	}
+}
+
 /*
  * Abort all requests.
  *
@@ -2337,12 +2353,8 @@ void fuse_abort_conn(struct fuse_conn *fc)
 		fc->max_background = UINT_MAX;
 		flush_bg_queue_and_unlock(fc);
 
-		for_each_online_cpu(cpu) {
-			if (fc->riqs)
-				fuse_abort_iqueue(per_cpu_ptr(fc->riqs, cpu), &to_end);
-			if (fc->wiqs)
-				fuse_abort_iqueue(per_cpu_ptr(fc->wiqs, cpu), &to_end);
-		}
+		fuse_abort_routing(&fc->wrt, &to_end);
+		fuse_abort_routing(&fc->rrt, &to_end);
 		fuse_abort_iqueue(&fc->main_iq, &to_end);
 
 		end_polls(fc);
@@ -2455,11 +2467,31 @@ static long fuse_dev_ioctl(struct file *file, unsigned int cmd,
 		}
 		break;
 	case FUSE_DEV_IOC_SETAFF:
-		res = fuse_install_percpu_iqs(fuse_get_dev(file), arg, 0);
+	{
+		struct fuse_iq_routing req = { .type = FUSE_ROUTING_CPU,
+					       .flags = FUSE_ROUTE_F_IOTYPE_R, .index = arg };
+
+		res = fuse_install_iq_route(fuse_get_dev(file), &req);
 		break;
+	}
 	case FUSE_DEV_IOC_SETAFF_W:
-		res = fuse_install_percpu_iqs(fuse_get_dev(file), arg, 1);
+	{
+		struct fuse_iq_routing req = { .type = FUSE_ROUTING_CPU,
+					       .flags = FUSE_ROUTE_F_IOTYPE_W, .index = arg };
+
+		res = fuse_install_iq_route(fuse_get_dev(file), &req);
 		break;
+	}
+	case FUSE_DEV_IOC_ROUTING:
+	{
+		struct fuse_iq_routing req;
+
+		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+			return -EFAULT;
+
+		res = fuse_install_iq_route(fuse_get_dev(file), &req);
+		break;
+	}
 	case FUSE_IOC_KIO_CALL:
 	{
 		struct fuse_kio_call req;
diff --git a/fs/fuse/file.c b/fs/fuse/file.c
index 6ea5921..0712251 100644
--- a/fs/fuse/file.c
+++ b/fs/fuse/file.c
@@ -21,6 +21,7 @@
 #include <linux/task_io_accounting_ops.h>
 #include <linux/fiemap.h>
 #include <linux/file.h>
+#include <linux/jhash.h>
 
 struct workqueue_struct *fuse_fput_wq;
 static DEFINE_SPINLOCK(fuse_fput_lock);
@@ -768,6 +769,38 @@ static int fuse_fsync(struct file *file, loff_t start, loff_t end,
 	return err;
 }
 
+struct fuse_iqueue *fuse_route_io(struct fuse_conn *fc, struct fuse_rtable *rt, size_t iosize,
+				  struct inode *inode)
+{
+	struct fuse_iqueue *fiq;
+	int i;
+
+	switch (rt->type) {
+	case FUSE_ROUTING_CPU:
+		fiq = raw_cpu_ptr(rt->iqs_cpu);
+		if (fiq->handled_by_fud)
+			return fiq;
+		break;
+	case FUSE_ROUTING_HASH:
+		i = jhash_1word((u32)inode->i_ino, 0) % rt->rt_size;
+		fiq = rt->iqs_table + i;
+		if (fiq->handled_by_fud)
+			return fiq;
+		break;
+	case FUSE_ROUTING_SIZE:
+		if (iosize == 0)
+			return NULL;
+
+		for (i = 0; i < rt->rt_size; i++) {
+			fiq = rt->iqs_table + i;
+			if (iosize <= fiq->size && fiq->handled_by_fud)
+				return fiq;
+		}
+		break;
+	}
+	return NULL;
+}
+
 void fuse_read_args_fill(struct fuse_io_args *ia, struct file *file, loff_t pos,
 			 size_t count, int opcode)
 {
@@ -789,12 +822,7 @@ void fuse_read_args_fill(struct fuse_io_args *ia, struct file *file, loff_t pos,
 	args->io_inode = file_inode(file);
 
 	if (opcode == FUSE_READ) {
-		if (ff->fm->fc->riqs) {
-			struct fuse_iqueue *fiq = raw_cpu_ptr(ff->fm->fc->riqs);
-
-			if (fiq->handled_by_fud)
-				args->fiq = fiq;
-		}
+		args->fiq = fuse_route_io(ff->fm->fc, &ff->fm->fc->rrt, count, args->io_inode);
 		args->inode = file->f_path.dentry->d_inode;
 		args->ff = ff;
 	}
@@ -1303,12 +1331,7 @@ static void fuse_write_args_fill(struct fuse_io_args *ia, struct fuse_file *ff,
 	args->io_inode = inode;
 	args->ff = ff;
 
-	if (ff->fm->fc->wiqs) {
-		struct fuse_iqueue *fiq = raw_cpu_ptr(ff->fm->fc->wiqs);
-
-		if (fiq->handled_by_fud)
-			args->fiq = fiq;
-	}
+	args->fiq = fuse_route_io(ff->fm->fc, &ff->fm->fc->wrt, count, inode);
 }
 
 static unsigned int fuse_write_flags(struct kiocb *iocb)
@@ -1957,6 +1980,8 @@ static void fuse_send_writepage(struct fuse_mount *fm,
 	args->force = true;
 	args->nocreds = true;
 
+	args->fiq = fuse_route_io(fm->fc, &fm->fc->wrt, inarg->size, wpa->inode);
+
 	spin_unlock(&fi->lock);
 	err = fuse_simple_background(fm, args, GFP_NOFS | __GFP_NOFAIL);
 	spin_lock(&fi->lock);
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index 74e95f5..c793bdc 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -507,6 +507,9 @@ struct fuse_iqueue {
 	/** # of fuds pointing to this fiq */
 	int handled_by_fud;
 
+	/** request size alllowed for this fiq */
+	int size;
+
 	/** Lock protecting accesses to members of this structure */
 	spinlock_t lock;
 
@@ -667,6 +670,16 @@ static inline unsigned int fuse_qhash_bucket(void)
 }
 #endif
 
+struct fuse_rtable {
+	int	type;
+	int	rt_size;
+	union {
+		void				*iqs;
+		struct fuse_iqueue __percpu	*iqs_cpu;
+		struct fuse_iqueue		*iqs_table;
+	};
+};
+
 /**
  * A Fuse connection.
  *
@@ -713,9 +726,9 @@ struct fuse_conn {
 	/** Input queue */
 	struct fuse_iqueue main_iq;
 
-	/** Per-cpu input queues */
-	struct fuse_iqueue __percpu *riqs;
-	struct fuse_iqueue __percpu *wiqs;
+	/** fiq routing tables */
+	struct fuse_rtable wrt;
+	struct fuse_rtable rrt;
 
 	/** The next unique kernel file handle */
 	atomic64_t khctr;
@@ -1277,7 +1290,7 @@ int fuse_conn_init(struct fuse_conn *fc, struct fuse_mount *fm,
 		    struct user_namespace *user_ns,
 		    const struct fuse_iqueue_ops *fiq_ops, void *fiq_priv);
 
-int fuse_install_percpu_iqs(struct fuse_dev *fud, int cpu, int rw);
+int fuse_install_iq_route(struct fuse_dev *fud, struct fuse_iq_routing *req);
 
 /**
  * Release reference to fuse_conn
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index c99dc3a..d5c3f7e 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -501,6 +501,31 @@ void fuse_kill_requests(struct fuse_conn *fc, struct inode *inode,
 }
 EXPORT_SYMBOL_GPL(fuse_kill_requests);
 
+static void fuse_kill_routing(struct fuse_rtable *rt, struct fuse_conn *fc, struct inode *inode)
+{
+	if (rt->type == FUSE_ROUTING_CPU) {
+		int cpu;
+
+		for_each_online_cpu(cpu) {
+			struct fuse_iqueue *fiq =  per_cpu_ptr(rt->iqs_cpu, cpu);
+
+			spin_lock(&fiq->lock);
+			fuse_kill_requests(fc, inode, &fiq->pending);
+			spin_unlock(&fiq->lock);
+		}
+	} else if (rt->type == FUSE_ROUTING_SIZE || rt->type == FUSE_ROUTING_HASH) {
+		int i;
+
+		for (i = 0; i < rt->rt_size; i++) {
+			struct fuse_iqueue *fiq =  rt->iqs_table + i;
+
+			spin_lock(&fiq->lock);
+			fuse_kill_requests(fc, inode, &fiq->pending);
+			spin_unlock(&fiq->lock);
+		}
+	}
+}
+
 int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid)
 {
 	struct inode *inode;
@@ -554,6 +579,9 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid)
 		fuse_kill_requests(fc, inode, &fc->main_iq.pending);
 		spin_unlock(&fc->main_iq.lock);
 
+		fuse_kill_routing(&fc->rrt, fc, inode);
+		fuse_kill_routing(&fc->wrt, fc, inode);
+
 		list_for_each_entry(fud, &fc->devices, entry) {
 			struct fuse_pqueue *fpq = &fud->pq;
 			struct fuse_iqueue *fiq = fud->fiq;
@@ -963,45 +991,146 @@ static void fuse_iqueue_init(struct fuse_iqueue *fiq,
 	fiq->priv = priv;
 }
 
-int fuse_install_percpu_iqs(struct fuse_dev *fud, int dest_cpu, int rw)
+static void fuse_free_routing(struct fuse_rtable *rt)
+{
+	if (rt->type == FUSE_ROUTING_CPU)
+		free_percpu(rt->iqs_cpu);
+	else if (rt->type == FUSE_ROUTING_SIZE || rt->type == FUSE_ROUTING_HASH)
+		kfree(rt->iqs_table);
+}
+
+static int alloc_rt_table(struct fuse_dev *fud, struct fuse_rtable *rt,
+			  struct fuse_iq_routing *req)
 {
 	int res = -EINVAL;
+	int idx;
+
+	switch (req->type) {
+	case FUSE_ROUTING_CPU:
+		if (req->index >= NR_CPUS || !cpu_possible(req->index))
+			break;
+
+		rt->iqs_cpu = alloc_percpu(struct fuse_iqueue);
+		if (!rt->iqs_cpu)
+			break;
+		for_each_possible_cpu(idx) {
+			fuse_iqueue_init(per_cpu_ptr(rt->iqs_cpu, idx), fud->fc->main_iq.ops,
+					 fud->fc->main_iq.priv);
+		}
+		res = 0;
+		break;
+	case FUSE_ROUTING_SIZE:
+		if (req->key > FUSE_MAX_MAX_PAGES*PAGE_SIZE || (req->key % PAGE_SIZE))
+			break;
+		fallthrough;
+	case FUSE_ROUTING_HASH:
+		if (req->index >= req->table_size)
+			break;
+		rt->rt_size = req->table_size;
+		rt->iqs_table = kcalloc(req->table_size, sizeof(struct fuse_iqueue), GFP_KERNEL);
+		if (!rt->iqs_table)
+			return -ENOMEM;
+		for (idx = 0; idx < rt->rt_size; idx++) {
+			fuse_iqueue_init(rt->iqs_table + idx, fud->fc->main_iq.ops,
+					 fud->fc->main_iq.priv);
+			rt->iqs_table[idx].size = 0;
+		}
+		res = 0;
+		break;
+	}
+	return res;
+}
 
-	if (dest_cpu < NR_CPUS && cpu_possible(dest_cpu)) {
-		struct fuse_iqueue __percpu **iqs_p = rw ? &fud->fc->wiqs : &fud->fc->riqs;
-		struct fuse_iqueue __percpu *iqs;
+static void adjust_rt_table(struct fuse_dev *fud, struct fuse_iqueue *fiq,
+			    struct fuse_iq_routing *req)
+{
+	u32 size = req->key;
 
-		iqs = *iqs_p;
-		if (iqs == NULL) {
-			int cpu;
+	fiq->size = size;
 
-			iqs = alloc_percpu(struct fuse_iqueue);
-			if (!iqs)
-				return -ENOMEM;
-			for_each_possible_cpu(cpu) {
-				fuse_iqueue_init(per_cpu_ptr(iqs, cpu), fud->fc->main_iq.ops,
-								  fud->fc->main_iq.priv);
-			}
-		}
+	if (fud->fc->max_pages < size / PAGE_SIZE)
+		fud->fc->max_pages = size / PAGE_SIZE;
 
-		spin_lock(&fud->fc->lock);
+	/* The first installed routing entry must establish minimal size,
+	 * this is important at size check in fuse_dev_do_read()
+	 */
+	if (fud->fc->main_iq.size == 0)
+		fud->fc->main_iq.size = size;
 
-		if (*iqs_p == NULL) {
-			*iqs_p = iqs;
-		} else if (*iqs_p != iqs) {
-			free_percpu(iqs);
-			iqs = *iqs_p;
-		}
+	if (req->flags & FUSE_ROUTE_F_IOTYPE_W) {
+		if (fud->fc->max_write < size)
+			fud->fc->max_write = size;
+	}
+	if (req->flags & FUSE_ROUTE_F_IOTYPE_R) {
+		if (fud->fc->max_read < size)
+			fud->fc->max_read = size;
+	}
+}
 
-		fud->fiq->handled_by_fud--;
-		BUG_ON(fud->fiq->handled_by_fud < 0);
+int fuse_install_iq_route(struct fuse_dev *fud, struct fuse_iq_routing *req)
+{
+	int res = -EINVAL;
+	struct fuse_rtable *rt = (req->flags & FUSE_ROUTE_F_IOTYPE_W) ? &fud->fc->wrt :
+		&fud->fc->rrt;
+	struct fuse_rtable rtl;
 
-		fud->fiq = per_cpu_ptr(iqs, dest_cpu);
+	if (rt->type != FUSE_ROUTING_NONE && rt->type != req->type)
+		return -EINVAL;
+
+	rtl.type = req->type;
+	rtl.iqs = rt->iqs;
+	if (rtl.iqs == NULL) {
+		res = alloc_rt_table(fud, &rtl, req);
+		if (res)
+			goto out;
+	}
 
-		fud->fiq->handled_by_fud++;
-		spin_unlock(&fud->fc->lock);
+	res = 0;
+	spin_lock(&fud->fc->lock);
+
+	if (rt->iqs == NULL) {
+		rt->iqs = rtl.iqs;
+		rt->type = rtl.type;
+		rt->rt_size = rtl.rt_size;
+	} else if (rt->iqs != rtl.iqs) {
+		fuse_free_routing(&rtl);
+		if (rt->type != req->type)
+			res = -EINVAL;
+	}
+
+	if (res)
+		goto out_unlock;
+
+	fud->fiq->handled_by_fud--;
+	BUG_ON(fud->fiq->handled_by_fud < 0);
+
+	switch (rt->type) {
+	case FUSE_ROUTING_CPU:
+		if (req->index >= NR_CPUS || !cpu_possible(req->index)) {
+			res = -EINVAL;
+			goto out_unlock;
+		}
+		fud->fiq = per_cpu_ptr(rt->iqs_cpu, req->index);
+		res = 0;
+		break;
+	case FUSE_ROUTING_SIZE:
+	case FUSE_ROUTING_HASH:
+		if (req->index >= rt->rt_size) {
+			res = -EINVAL;
+			goto out_unlock;
+		}
+		fud->fiq = rt->iqs_table + req->index;
+		if (rt->type == FUSE_ROUTING_SIZE)
+			adjust_rt_table(fud, fud->fiq, req);
 		res = 0;
+		break;
 	}
+
+	fud->fiq->handled_by_fud++;
+
+out_unlock:
+	spin_unlock(&fud->fc->lock);
+out:
 	return res;
 }
 
@@ -1486,10 +1615,8 @@ void fuse_send_init(struct fuse_mount *fm)
 void fuse_free_conn(struct fuse_conn *fc)
 {
 	WARN_ON(!list_empty(&fc->devices));
-	if (fc->riqs)
-		free_percpu(fc->riqs);
-	if (fc->wiqs)
-		free_percpu(fc->wiqs);
+	fuse_free_routing(&fc->wrt);
+	fuse_free_routing(&fc->rrt);
 	kfree_rcu(fc, rcu);
 }
 EXPORT_SYMBOL_GPL(fuse_free_conn);
diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h
index 8414d2c..607b1c1 100644
--- a/include/uapi/linux/fuse.h
+++ b/include/uapi/linux/fuse.h
@@ -946,11 +946,29 @@ struct fuse_notify_retrieve_in {
 	uint64_t	dummy4;
 };
 
+struct fuse_iq_routing {
+	uint16_t	type;
+#define FUSE_ROUTING_NONE	0
+#define FUSE_ROUTING_CPU	1
+#define FUSE_ROUTING_SIZE	2
+#define FUSE_ROUTING_HASH	3
+	uint16_t	flags;
+#define FUSE_ROUTE_F_IOTYPE_MASK	3
+#define FUSE_ROUTE_F_IOTYPE_R		1
+#define FUSE_ROUTE_F_IOTYPE_W		2
+	uint32_t	table_size;
+	uint32_t	index;
+	uint32_t	key;
+	uint32_t	reserved[3];
+};
+
 /* Device ioctls: */
 #define FUSE_DEV_IOC_MAGIC		229
 #define FUSE_DEV_IOC_CLONE		_IOR(FUSE_DEV_IOC_MAGIC, 0, uint32_t)
+/* *SETAFF* are for compatibility, should use FUSE_DEV_IOC_ROUTING instead */
 #define FUSE_DEV_IOC_SETAFF		_IO(FUSE_DEV_IOC_MAGIC, 1)
 #define FUSE_DEV_IOC_SETAFF_W		_IO(FUSE_DEV_IOC_MAGIC, 2)
+#define FUSE_DEV_IOC_ROUTING		_IOR(FUSE_DEV_IOC_MAGIC, 6, struct fuse_iq_routing)
 
 struct fuse_lseek_in {
 	uint64_t	fh;
-- 
1.8.3.1



More information about the Devel mailing list