[Devel] [PATCH vz7 45/46] fuse: separate iqueue for clones
Maxim Patlasov
mpatlasov at virtuozzo.com
Fri Mar 24 19:30:35 PDT 2017
The patch implements:
AK's ideas about enhancing mt-fuse are about handling incoming requests in
kernel. Currently, mainline puts all of them in a single (global) queue,
letting each userspace thread to pick up an arbitrary "first" request from
the queue. AK suggests to introduce CPU_core-to-FUSE_device mapping in kernel.
Then, if a request is originated on CPU_core==i, let's put it to a queue bound
to FUSE_device[i] and wakeup userspace waiting for events on this device.
Also, AK suggests to enable this technique only for FUSE_READ-s, putting all
other requests to some "main" FUSE_device queue. In turn, the userspace is
supposed to epoll() all FUSE_devices in some "main" thread, passing the task
of replying to a FUSE_READ request to some proper "auxiliary" thread
(the thread will be chosen based on the epoll() results).
Signed-off-by: Maxim Patlasov <mpatlasov at virtuozzo.com>
---
fs/fuse/control.c | 17 +++++++++-
fs/fuse/cuse.c | 6 +++-
fs/fuse/dev.c | 88 +++++++++++++++++++++++++++++++----------------------
fs/fuse/file.c | 14 +++++---
fs/fuse/fuse_i.h | 22 ++++++++++---
fs/fuse/inode.c | 30 ++++++++++++++----
6 files changed, 122 insertions(+), 55 deletions(-)
diff --git a/fs/fuse/control.c b/fs/fuse/control.c
index c35a69b..1461e58 100644
--- a/fs/fuse/control.c
+++ b/fs/fuse/control.c
@@ -281,7 +281,7 @@ static int fuse_conn_seq_open(struct file *filp, int list_id)
fcp->conn = conn;
switch (list_id) {
case FUSE_PENDING_REQ:
- fcp->req_list = &conn->iq.pending;
+ fcp->req_list = &conn->main_iq.pending;
break;
#if 0
case FUSE_PROCESSING_REQ:
@@ -400,10 +400,23 @@ static const struct file_operations fuse_conn_files_ops = {
static int fuse_conn_show(struct seq_file *sf, void *v)
{
struct fuse_conn *fc = sf->private;
+ struct fuse_dev *fud;
+ int n_total = 0;
+ int n_active = 0;
+
+ spin_lock(&fc->lock);
+ list_for_each_entry(fud, &fc->devices, entry) {
+ struct fuse_iqueue *fiq = fud->fiq;
+ if (waitqueue_active(&fiq->waitq))
+ n_active++;
+ n_total++;
+ }
+ spin_unlock(&fc->lock);
+
seq_printf(sf, "Connected: %d\n", fc->connected);
seq_printf(sf, "Initialized: %d\n", fc->initialized);
seq_printf(sf, "Blocked: %d\n", fc->blocked);
- seq_printf(sf, "WQ active: %d\n", waitqueue_active(&fc->iq.waitq));
+ seq_printf(sf, "WQ active: %d of %d\n", n_active, n_total);
seq_printf(sf, "Blocked_wq active: %d\n", waitqueue_active(&fc->blocked_waitq));
seq_printf(sf, "num_background: %d\n", fc->num_background);
seq_printf(sf, "num_waiting: %d\n", atomic_read(&fc->num_waiting));
diff --git a/fs/fuse/cuse.c b/fs/fuse/cuse.c
index 9935d02..b705461 100644
--- a/fs/fuse/cuse.c
+++ b/fs/fuse/cuse.c
@@ -503,7 +503,11 @@ static int cuse_channel_open(struct inode *inode, struct file *file)
if (!cc)
return -ENOMEM;
- fuse_conn_init(&cc->fc);
+ rc = fuse_conn_init(&cc->fc);
+ if (rc) {
+ kfree(cc);
+ return rc;
+ }
fud = fuse_dev_alloc(&cc->fc);
if (!fud) {
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 44cf447..08c825e 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -36,7 +36,8 @@ static struct fuse_dev *fuse_get_dev(struct file *file)
return ACCESS_ONCE(file->private_data);
}
-static void fuse_request_init(struct fuse_req *req, struct page **pages,
+static void fuse_request_init(struct fuse_conn *fc,
+ struct fuse_req *req, struct page **pages,
struct fuse_page_desc *page_descs,
unsigned npages)
{
@@ -50,10 +51,12 @@ static void fuse_request_init(struct fuse_req *req, struct page **pages,
req->pages = pages;
req->page_descs = page_descs;
req->max_pages = npages;
+ req->fiq = &fc->main_iq;
__set_bit(FR_PENDING, &req->flags);
}
-static struct fuse_req *__fuse_request_alloc(unsigned npages, gfp_t flags)
+static struct fuse_req *__fuse_request_alloc(struct fuse_conn *fc,
+ unsigned npages, gfp_t flags)
{
struct fuse_req *req = kmem_cache_alloc(fuse_req_cachep, flags);
if (req) {
@@ -76,20 +79,20 @@ static struct fuse_req *__fuse_request_alloc(unsigned npages, gfp_t flags)
return NULL;
}
- fuse_request_init(req, pages, page_descs, npages);
+ fuse_request_init(fc, req, pages, page_descs, npages);
}
return req;
}
-struct fuse_req *fuse_request_alloc(unsigned npages)
+struct fuse_req *fuse_request_alloc(struct fuse_conn *fc, unsigned npages)
{
- return __fuse_request_alloc(npages, GFP_KERNEL);
+ return __fuse_request_alloc(fc, npages, GFP_KERNEL);
}
EXPORT_SYMBOL_GPL(fuse_request_alloc);
-struct fuse_req *fuse_request_alloc_nofs(unsigned npages)
+struct fuse_req *fuse_request_alloc_nofs(struct fuse_conn *fc, unsigned npages)
{
- return __fuse_request_alloc(npages, GFP_NOFS);
+ return __fuse_request_alloc(fc, npages, GFP_NOFS);
}
void fuse_request_free(struct fuse_req *req)
@@ -156,7 +159,7 @@ static struct fuse_req *__fuse_get_req(struct fuse_conn *fc, unsigned npages,
if (fc->conn_error)
goto out;
- req = fuse_request_alloc(npages);
+ req = fuse_request_alloc(fc, npages);
err = -ENOMEM;
if (!req) {
if (for_background)
@@ -223,7 +226,7 @@ static void put_reserved_req(struct fuse_conn *fc, struct fuse_req *req)
struct fuse_file *ff = file->private_data;
spin_lock(&fc->lock);
- fuse_request_init(req, req->pages, req->page_descs, req->max_pages);
+ fuse_request_init(fc, req, req->pages, req->page_descs, req->max_pages);
BUG_ON(ff->reserved_req);
ff->reserved_req = req;
wake_up_all(&fc->reserved_req_waitq);
@@ -253,7 +256,7 @@ struct fuse_req *fuse_get_req_nofail_nopages(struct fuse_conn *fc,
wait_event(fc->blocked_waitq, fc->initialized);
/* Matches smp_wmb() in fuse_set_initialized() */
smp_rmb();
- req = fuse_request_alloc(0);
+ req = fuse_request_alloc(fc, 0);
if (!req)
req = get_reserved_req(fc, file);
@@ -318,7 +321,7 @@ static void queue_request(struct fuse_iqueue *fiq, struct fuse_req *req)
void fuse_queue_forget(struct fuse_conn *fc, struct fuse_forget_link *forget,
u64 nodeid, u64 nlookup)
{
- struct fuse_iqueue *fiq = &fc->iq;
+ struct fuse_iqueue *fiq = &fc->main_iq;
forget->forget_one.nodeid = nodeid;
forget->forget_one.nlookup = nlookup;
@@ -335,12 +338,11 @@ void fuse_queue_forget(struct fuse_conn *fc, struct fuse_forget_link *forget,
spin_unlock(&fiq->waitq.lock);
}
-static void flush_bg_queue(struct fuse_conn *fc)
+static void flush_bg_queue(struct fuse_conn *fc, struct fuse_iqueue *fiq)
{
while (fc->active_background < fc->max_background &&
!list_empty(&fc->bg_queue)) {
struct fuse_req *req;
- struct fuse_iqueue *fiq = &fc->iq;
req = list_entry(fc->bg_queue.next, struct fuse_req, list);
list_del(&req->list);
@@ -362,7 +364,7 @@ static void flush_bg_queue(struct fuse_conn *fc)
*/
static void request_end(struct fuse_conn *fc, struct fuse_req *req)
{
- struct fuse_iqueue *fiq = &fc->iq;
+ struct fuse_iqueue *fiq = req->fiq;
if (test_and_set_bit(FR_FINISHED, &req->flags))
return;
@@ -389,7 +391,7 @@ static void request_end(struct fuse_conn *fc, struct fuse_req *req)
}
fc->num_background--;
fc->active_background--;
- flush_bg_queue(fc);
+ flush_bg_queue(fc, fiq);
spin_unlock(&fc->lock);
}
wake_up(&req->waitq);
@@ -415,7 +417,7 @@ static void queue_interrupt(struct fuse_iqueue *fiq, struct fuse_req *req)
static void request_wait_answer(struct fuse_conn *fc, struct fuse_req *req)
{
- struct fuse_iqueue *fiq = &fc->iq;
+ struct fuse_iqueue *fiq = req->fiq;
int err;
if (!fc->no_interrupt) {
@@ -462,7 +464,7 @@ static void request_wait_answer(struct fuse_conn *fc, struct fuse_req *req)
static void __fuse_request_send(struct fuse_conn *fc, struct fuse_req *req,
struct fuse_file *ff)
{
- struct fuse_iqueue *fiq = &fc->iq;
+ struct fuse_iqueue *fiq = req->fiq;
BUG_ON(test_bit(FR_BACKGROUND, &req->flags));
spin_lock(&fiq->waitq.lock);
@@ -511,6 +513,8 @@ EXPORT_SYMBOL_GPL(fuse_request_send);
void fuse_request_send_background_locked(struct fuse_conn *fc,
struct fuse_req *req)
{
+ struct fuse_iqueue *fiq = req->fiq;
+
BUG_ON(!test_bit(FR_BACKGROUND, &req->flags));
if (!test_bit(FR_WAITING, &req->flags)) {
__set_bit(FR_WAITING, &req->flags);
@@ -526,7 +530,7 @@ void fuse_request_send_background_locked(struct fuse_conn *fc,
set_bdi_congested(&fc->bdi, BLK_RW_ASYNC);
}
list_add_tail(&req->list, &fc->bg_queue);
- flush_bg_queue(fc);
+ flush_bg_queue(fc, fiq);
}
void fuse_request_send_background(struct fuse_conn *fc, struct fuse_req *req)
@@ -557,7 +561,7 @@ static int fuse_request_send_notify_reply(struct fuse_conn *fc,
struct fuse_req *req, u64 unique)
{
int err = -ENODEV;
- struct fuse_iqueue *fiq = &fc->iq;
+ struct fuse_iqueue *fiq = req->fiq;
__clear_bit(FR_ISREPLY, &req->flags);
req->in.h.unique = unique;
@@ -1207,7 +1211,7 @@ static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file,
{
ssize_t err;
struct fuse_conn *fc = fud->fc;
- struct fuse_iqueue *fiq = &fc->iq;
+ struct fuse_iqueue *fiq = fud->fiq;
struct fuse_pqueue *fpq = &fud->pq;
struct fuse_req *req;
struct fuse_in *in;
@@ -1918,7 +1922,7 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
if (oh.error == -ENOSYS)
fc->no_interrupt = 1;
else if (oh.error == -EAGAIN)
- queue_interrupt(&fc->iq, req);
+ queue_interrupt(req->fiq, req);
fuse_copy_finish(cs);
return nbytes;
@@ -2058,7 +2062,7 @@ static unsigned fuse_dev_poll(struct file *file, poll_table *wait)
if (!fud)
return POLLERR;
- fiq = &fud->fc->iq;
+ fiq = fud->fiq;
poll_wait(file, &fiq->waitq, wait);
spin_lock(&fiq->waitq.lock);
@@ -2103,6 +2107,22 @@ static void end_polls(struct fuse_conn *fc)
}
}
+void fuse_abort_iqueue(struct fuse_iqueue *fiq, struct list_head *to_end)
+{
+ struct fuse_req *req;
+
+ spin_lock(&fiq->waitq.lock);
+ fiq->connected = 0;
+ list_splice_init(&fiq->pending, to_end);
+ list_for_each_entry(req, to_end, list)
+ clear_bit(FR_PENDING, &req->flags);
+ while (forget_pending(fiq))
+ kfree(dequeue_forget(fiq, 1, NULL));
+ wake_up_all_locked(&fiq->waitq);
+ spin_unlock(&fiq->waitq.lock);
+ kill_fasync(&fiq->fasync, SIGIO, POLL_IN);
+}
+
/*
* Abort all requests.
*
@@ -2123,7 +2143,7 @@ static void end_polls(struct fuse_conn *fc)
*/
void fuse_abort_conn(struct fuse_conn *fc)
{
- struct fuse_iqueue *fiq = &fc->iq;
+ int cpu;
spin_lock(&fc->lock);
if (fc->connected) {
@@ -2154,18 +2174,14 @@ void fuse_abort_conn(struct fuse_conn *fc)
spin_unlock(&fpq->lock);
}
fc->max_background = UINT_MAX;
- flush_bg_queue(fc);
+ for_each_online_cpu(cpu)
+ flush_bg_queue(fc, per_cpu_ptr(fc->iqs, cpu));
+ flush_bg_queue(fc, &fc->main_iq);
+
+ for_each_online_cpu(cpu)
+ fuse_abort_iqueue(per_cpu_ptr(fc->iqs, cpu), &to_end2);
+ fuse_abort_iqueue(&fc->main_iq, &to_end2);
- spin_lock(&fiq->waitq.lock);
- fiq->connected = 0;
- list_splice_init(&fiq->pending, &to_end2);
- list_for_each_entry(req, &to_end2, list)
- clear_bit(FR_PENDING, &req->flags);
- while (forget_pending(fiq))
- kfree(dequeue_forget(fiq, 1, NULL));
- wake_up_all_locked(&fiq->waitq);
- spin_unlock(&fiq->waitq.lock);
- kill_fasync(&fiq->fasync, SIGIO, POLL_IN);
end_polls(fc);
wake_up_all(&fc->blocked_waitq);
spin_unlock(&fc->lock);
@@ -2195,7 +2211,7 @@ int fuse_dev_release(struct inode *inode, struct file *file)
end_requests(fc, &fpq->processing);
/* Are we the last open device? */
if (atomic_dec_and_test(&fc->dev_count)) {
- WARN_ON(fc->iq.fasync != NULL);
+ WARN_ON(fud->fiq->fasync != NULL);
fuse_abort_conn(fc);
}
fuse_dev_free(fud);
@@ -2212,7 +2228,7 @@ static int fuse_dev_fasync(int fd, struct file *file, int on)
return -EPERM;
/* No locking - fasync_helper does its own locking */
- return fasync_helper(fd, file, on, &fud->fc->iq.fasync);
+ return fasync_helper(fd, file, on, &fud->fiq->fasync);
}
static int fuse_device_clone(struct fuse_conn *fc, struct file *new)
diff --git a/fs/fuse/file.c b/fs/fuse/file.c
index 3a5388e..f00d05b 100644
--- a/fs/fuse/file.c
+++ b/fs/fuse/file.c
@@ -80,7 +80,7 @@ struct fuse_file *fuse_file_alloc(struct fuse_conn *fc)
ff->ff_state = 0;
ff->fc = fc;
- ff->reserved_req = fuse_request_alloc(0);
+ ff->reserved_req = fuse_request_alloc(fc, 0);
if (unlikely(!ff->reserved_req)) {
kfree(ff);
return NULL;
@@ -770,8 +770,12 @@ void fuse_read_fill(struct fuse_req *req, struct file *file, loff_t pos,
req->out.numargs = 1;
req->out.args[0].size = count;
- if (opcode == FUSE_READ)
+ if (opcode == FUSE_READ) {
+ struct fuse_iqueue *fiq = __this_cpu_ptr(ff->fc->iqs);
+ if (fiq->handled_by_fud)
+ req->fiq = fiq;
req->inode = file->f_dentry->d_inode;
+ }
}
static void fuse_release_user_pages(struct fuse_req *req, int write)
@@ -2003,7 +2007,7 @@ static int fuse_writepage_locked(struct page *page,
if (test_set_page_writeback(page))
BUG();
- req = fuse_request_alloc_nofs(1);
+ req = fuse_request_alloc_nofs(fc, 1);
if (!req)
goto err;
@@ -2247,7 +2251,7 @@ static int fuse_writepages_fill(struct page *page,
}
data->req = req =
- fuse_request_alloc_nofs(FUSE_MAX_PAGES_PER_REQ);
+ fuse_request_alloc_nofs(fc, FUSE_MAX_PAGES_PER_REQ);
if (!req) {
unlock_page(page);
return -ENOMEM;
@@ -2321,7 +2325,7 @@ static int fuse_writepages(struct address_space *mapping,
fuse_release_ff(inode, ff);
data.inode = inode;
- data.req = fuse_request_alloc_nofs(FUSE_MAX_PAGES_PER_REQ);
+ data.req = fuse_request_alloc_nofs(fc, FUSE_MAX_PAGES_PER_REQ);
err = -ENOMEM;
if (!data.req)
goto out;
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index 77230c3..6a82ced 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -407,12 +407,18 @@ struct fuse_req {
/** Request is stolen from fuse_file->reserved_req */
struct file *stolen_file;
+
+ /** Request will be handled by fud pointing to this fiq */
+ struct fuse_iqueue *fiq;
};
struct fuse_iqueue {
/** Connection established */
unsigned connected;
+ /** # of fuds pointing to this fiq */
+ int handled_by_fud;
+
/** Readers of the connection are waiting on this */
wait_queue_head_t waitq;
@@ -457,6 +463,9 @@ struct fuse_dev {
/** Fuse connection for this device */
struct fuse_conn *fc;
+ /** Input queue */
+ struct fuse_iqueue *fiq;
+
/** Processing queue */
struct fuse_pqueue pq;
@@ -499,8 +508,11 @@ struct fuse_conn {
/** Maximum write size */
unsigned max_write;
- /** Input queue */
- struct fuse_iqueue iq;
+ /** Main input queue */
+ struct fuse_iqueue main_iq;
+
+ /** Per-cpu input queues */
+ struct fuse_iqueue __percpu *iqs;
/** The next unique kernel file handle */
u64 khctr;
@@ -818,9 +830,9 @@ void fuse_ctl_cleanup(void);
/**
* Allocate a request
*/
-struct fuse_req *fuse_request_alloc(unsigned npages);
+struct fuse_req *fuse_request_alloc(struct fuse_conn *fc, unsigned npages);
-struct fuse_req *fuse_request_alloc_nofs(unsigned npages);
+struct fuse_req *fuse_request_alloc_nofs(struct fuse_conn *fc, unsigned npages);
/**
* Free a request
@@ -898,7 +910,7 @@ struct fuse_conn *fuse_conn_get(struct fuse_conn *fc);
/**
* Initialize fuse_conn
*/
-void fuse_conn_init(struct fuse_conn *fc);
+int fuse_conn_init(struct fuse_conn *fc);
/**
* Release reference to fuse_conn
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index 3ac204e..705c98b 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -424,12 +424,14 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid)
spin_lock(&fc->lock);
list_for_each_entry(fud, &fc->devices, entry) {
struct fuse_pqueue *fpq = &fud->pq;
+ struct fuse_iqueue *fiq = fud->fiq;
spin_lock(&fpq->lock);
fuse_kill_requests(fc, inode, &fpq->processing);
+ fuse_kill_requests(fc, inode, &fiq->pending);
fuse_kill_requests(fc, inode, &fpq->io);
spin_unlock(&fpq->lock);
}
- fuse_kill_requests(fc, inode, &fc->iq.pending);
+ fuse_kill_requests(fc, inode, &fc->main_iq.pending);
fuse_kill_requests(fc, inode, &fc->bg_queue);
wake_up(&fi->page_waitq); /* readpage[s] can wait on fuse wb */
spin_unlock(&fc->lock);
@@ -712,8 +714,9 @@ static void fuse_pqueue_init(struct fuse_pqueue *fpq)
fpq->connected = 1;
}
-void fuse_conn_init(struct fuse_conn *fc)
+int fuse_conn_init(struct fuse_conn *fc)
{
+ int cpu;
memset(fc, 0, sizeof(*fc));
spin_lock_init(&fc->lock);
mutex_init(&fc->inst_mutex);
@@ -722,7 +725,12 @@ void fuse_conn_init(struct fuse_conn *fc)
atomic_set(&fc->dev_count, 1);
init_waitqueue_head(&fc->blocked_waitq);
init_waitqueue_head(&fc->reserved_req_waitq);
- fuse_iqueue_init(&fc->iq);
+ fuse_iqueue_init(&fc->main_iq);
+ fc->iqs = alloc_percpu(struct fuse_iqueue);
+ if (!fc->iqs)
+ return -ENOMEM;
+ for_each_online_cpu(cpu)
+ fuse_iqueue_init(per_cpu_ptr(fc->iqs, cpu));
INIT_LIST_HEAD(&fc->bg_queue);
INIT_LIST_HEAD(&fc->entry);
INIT_LIST_HEAD(&fc->conn_files);
@@ -737,6 +745,7 @@ void fuse_conn_init(struct fuse_conn *fc)
fc->connected = 1;
fc->attr_version = 1;
get_random_bytes(&fc->scramble_key, sizeof(fc->scramble_key));
+ return 0;
}
EXPORT_SYMBOL_GPL(fuse_conn_init);
@@ -1070,6 +1079,7 @@ static void fuse_send_init(struct fuse_conn *fc, struct fuse_req *req)
static void fuse_free_conn(struct fuse_conn *fc)
{
WARN_ON(!list_empty(&fc->devices));
+ free_percpu(fc->iqs);
kfree(fc);
}
@@ -1128,9 +1138,11 @@ struct fuse_dev *fuse_dev_alloc(struct fuse_conn *fc)
fud = kzalloc(sizeof(struct fuse_dev), GFP_KERNEL);
if (fud) {
fud->fc = fuse_conn_get(fc);
+ fud->fiq = &fc->main_iq;
fuse_pqueue_init(&fud->pq);
spin_lock(&fc->lock);
+ fud->fiq->handled_by_fud++;
list_add_tail(&fud->entry, &fc->devices);
spin_unlock(&fc->lock);
}
@@ -1145,6 +1157,8 @@ void fuse_dev_free(struct fuse_dev *fud)
if (fc) {
spin_lock(&fc->lock);
+ fud->fiq->handled_by_fud--;
+ BUG_ON(fud->fiq->handled_by_fud < 0);
list_del(&fud->entry);
spin_unlock(&fc->lock);
@@ -1205,7 +1219,11 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent)
if (!fc)
goto err_fput;
- fuse_conn_init(fc);
+ err = fuse_conn_init(fc);
+ if (err) {
+ kfree(fc);
+ goto err_fput;
+ }
fc->release = fuse_free_conn;
fud = fuse_dev_alloc(fc);
@@ -1242,13 +1260,13 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent)
/* only now - we want root dentry with NULL ->d_op */
sb->s_d_op = &fuse_dentry_operations;
- init_req = fuse_request_alloc(0);
+ init_req = fuse_request_alloc(fc, 0);
if (!init_req)
goto err_put_root;
__set_bit(FR_BACKGROUND, &init_req->flags);
if (is_bdev || (fc->flags & FUSE_UMOUNT_WAIT)) {
- fc->destroy_req = fuse_request_alloc(0);
+ fc->destroy_req = fuse_request_alloc(fc, 0);
if (!fc->destroy_req)
goto err_free_init_req;
}
More information about the Devel
mailing list