[Devel] [PATCH RHEL7 COMMIT] fs/fuse kio_pcs: fix grow work execution with zero
Konstantin Khorenko
khorenko at virtuozzo.com
Wed Jul 4 18:06:50 MSK 2018
The commit is pushed to "branch-rh7-3.10.0-862.3.2.vz7.61.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-862.3.2.vz7.61.12
------>
commit dd1f51ac2d6ff7ca469cea3ab73df2addd55ee97
Author: Pavel Butsykin <pbutsykin at virtuozzo.com>
Date: Wed Jul 4 18:06:50 2018 +0300
fs/fuse kio_pcs: fix grow work execution with zero
The bug is that grow work can be added twice to queue with the same list
of requests. First execution of fuse_size_grow_work() should clear
size.grow_queue, and the second execution will be with the empty list.
As a result, submit_size_grow() will be executed with szie == 0.
How it's possible:
cpu0 cpu1
fuse_size_grow_work()
spin_lock(&di->lock)
...
di->size.required = size; <-- 1
spin_unlock(&di->lock)
submit_size_grow(inode, size == 1)
...
spin_lock(&di->lock)
...
wait_grow()
list_add_tail(req with size = 2, &di->size.grow.queue);
if (!di->size.required) /* required == 1 */
queue_work(pcs_wq, &di->size.work);
spin_unlock(&di->lock)
...
...
spin_lock(&di->lock)
...
di->size.required = 0;
if (!list_empty(&di->size.grow_queue))
queue_work(pcs_wq, &di->size.work); <-- queue for req with size = 2
spin_unlock(&di->lock);
...
spin_lock(&di->lock)
...
fuse_size_grow_work()
/* wait for di->lock */ wait_grow()
list_add_tail(req with size = 3, &di->size.grow.queue);
if (!di->size.required) /* required == 0 */
queue_work(pcs_wq, &di->size.work); <-- queue for req with size = 2, 3
spin_unlock(&di->lock)
spin_lock(&di->lock)
...
di->size.required = size; <-- 3
spin_unlock(&di->lock)
submit_size_grow(inode, size == 3)
...
spin_lock(&di->lock)
...
di->size.required = 0;
if (!list_empty(&di->size.grow_queue) /* grow_queue is empty now */
queue_work(pcs_wq, &di->size.work);
spin_unlock(&di->lock);
...
fuse_size_grow_work()
spin_lock(&di->lock)
/* grow_queue is still empty */
...
di->size.required = size; <-- 0
spin_unlock(&di->lock)
submit_size_grow(inode, size == 0) --> shrink
This patch simplifies interaction with pcs_dentry_info.size struct and fixes
the issue with execution of the grow work when the queue is empty.
It's not necessary to have two queues for grow and shrink operations, when these
operations can't be executed in parallel. Therefore, such a queue will be one,
and instead of size.shrink will be size.op to distinguish the operations among
themselves.
https://jira.sw.ru/browse/PSBM-85945
Signed-off-by: Pavel Butsykin <pbutsykin at virtuozzo.com>
Acked-by: Alexey Kuznetsov <kuznet at virtuozzo.com>
---
fs/fuse/kio/pcs/pcs_client_types.h | 12 +++--
fs/fuse/kio/pcs/pcs_cluster.h | 7 +--
fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 106 +++++++++++++++++--------------------
3 files changed, 59 insertions(+), 66 deletions(-)
diff --git a/fs/fuse/kio/pcs/pcs_client_types.h b/fs/fuse/kio/pcs/pcs_client_types.h
index a060ed58f87d..089817b4af9c 100644
--- a/fs/fuse/kio/pcs/pcs_client_types.h
+++ b/fs/fuse/kio/pcs/pcs_client_types.h
@@ -46,6 +46,13 @@ struct pcs_mapping {
struct pcs_flow_table ftab;
};
+
+typedef enum {
+ PCS_SIZE_INACTION,
+ PCS_SIZE_GROW,
+ PCS_SIZE_SHRINK,
+} size_op_t;
+
struct fuse_inode;
struct pcs_dentry_info {
struct pcs_dentry_id id;
@@ -56,10 +63,9 @@ struct pcs_dentry_info {
spinlock_t lock;
struct {
struct work_struct work;
- unsigned long long shrink;
+ struct list_head queue;
unsigned long long required;
- struct list_head grow_queue;
- struct list_head shrink_queue;
+ size_op_t op;
} size;
struct fuse_inode *inode;
};
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 0f13345f919f..191753ba316d 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -8,12 +8,7 @@ struct fuse_conn;
/* Try to follows pcs/client/fused structure style */
struct pcs_fuse_exec_ctx {
struct pcs_int_request ireq;
- /* The file size control block */
- struct {
- unsigned long long required;
- unsigned char granted;
- unsigned char waiting;
- } size;
+ unsigned long long size_required;
struct {
pcs_api_iorequest_t req;
struct bio_vec *bvec;
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 60401252a3b0..c9a291391405 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -283,8 +283,9 @@ static int kpcs_do_file_open(struct fuse_conn *fc, struct file *file, struct ino
/* di.id.name.len = id->name.len; */
spin_lock_init(&di->lock);
- INIT_LIST_HEAD(&di->size.grow_queue);
- INIT_LIST_HEAD(&di->size.shrink_queue);
+ INIT_LIST_HEAD(&di->size.queue);
+ di->size.required = 0;
+ di->size.op = PCS_SIZE_INACTION;
INIT_WORK(&di->size.work, fuse_size_grow_work);
pcs_mapping_init(&pfc->cc, &di->mapping);
@@ -341,8 +342,7 @@ void kpcs_inode_release(struct fuse_inode *fi)
if(!di)
return;
- BUG_ON(!list_empty(&di->size.grow_queue));
- BUG_ON(!list_empty(&di->size.shrink_queue));
+ BUG_ON(!list_empty(&di->size.queue));
pcs_mapping_invalidate(&di->mapping);
pcs_mapping_deinit(&di->mapping);
/* TODO: properly destroy dentry info here!! */
@@ -658,92 +658,88 @@ static void fuse_size_grow_work(struct work_struct *w)
struct pcs_dentry_info* di = container_of(w, struct pcs_dentry_info, size.work);
struct inode *inode = &di->inode->inode;
struct pcs_int_request* ireq, *next;
- unsigned long long size = 0;
+ unsigned long long size;
int err;
- LIST_HEAD(to_submit);
+ LIST_HEAD(pending_reqs);
spin_lock(&di->lock);
- BUG_ON(di->size.shrink);
- if (di->size.required) {
+ BUG_ON(di->size.op != PCS_SIZE_INACTION);
+
+ size = di->size.required;
+ if (!size) {
+ BUG_ON(!list_empty(&di->size.queue));
spin_unlock(&di->lock);
+ TRACE("No more pending writes\n");
return;
}
- list_for_each_entry(ireq, &di->size.grow_queue, list) {
- struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
-
- TRACE("ino:%ld r(%p)->size:%lld required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+ BUG_ON(di->fileinfo.attr.size >= size);
- BUG_ON(!r->exec.size.required);
- if (size < r->exec.size.required)
- size = r->exec.size.required;
- }
- di->size.required = size;
+ list_splice_tail_init(&di->size.queue, &pending_reqs);
+ di->size.op = PCS_SIZE_GROW;
spin_unlock(&di->lock);
err = submit_size_grow(inode, size);
if (err) {
- LIST_HEAD(to_fail);
-
spin_lock(&di->lock);
+ di->size.op = PCS_SIZE_INACTION;
+ list_splice_tail_init(&di->size.queue, &pending_reqs);
di->size.required = 0;
- list_splice_tail_init(&di->size.grow_queue, &to_fail);
spin_unlock(&di->lock);
- pcs_ireq_queue_fail(&to_fail, err);
+ pcs_ireq_queue_fail(&pending_reqs, err);
return;
}
spin_lock(&di->lock);
- BUG_ON(di->size.shrink);
- BUG_ON(di->size.required != size);
- list_for_each_entry_safe(ireq, next, &di->size.grow_queue, list) {
- struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
+ BUG_ON(di->size.required < size);
+ di->size.op = PCS_SIZE_INACTION;
- BUG_ON(!r->exec.size.required);
- BUG_ON(!r->exec.size.waiting);
- BUG_ON(r->exec.size.granted);
- if (size >= r->exec.size.required) {
- TRACE("resubmit ino:%ld r(%p)->size:%lld required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+ list_for_each_entry_safe(ireq, next, &di->size.queue, list) {
+ struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
- r->exec.size.waiting = 0;
- r->exec.size.granted = 1;
- list_move(&ireq->list, &to_submit);
+ BUG_ON(!r->exec.size_required);
+ if (size >= r->exec.size_required) {
+ TRACE("resubmit ino:%ld r(%p)->size:%lld required:%lld\n",
+ inode->i_ino, r, r->exec.size_required, size);
+ list_move(&ireq->list, &pending_reqs);
}
}
- di->size.required = 0;
- if (!list_empty(&di->size.grow_queue))
- queue_work(pcs_wq, &di->size.work);
+
+ if (list_empty(&di->size.queue))
+ di->size.required = 0;
spin_unlock(&di->lock);
- pcs_cc_requeue(di->cluster, &to_submit);
+ pcs_cc_requeue(di->cluster, &pending_reqs);
}
static void wait_grow(struct pcs_fuse_req *r, struct pcs_dentry_info *di, unsigned long long required)
{
assert_spin_locked(&di->lock);
- BUG_ON(r->exec.size.waiting);
+ BUG_ON(r->exec.size_required);
BUG_ON(r->req.in.h.opcode != FUSE_WRITE && r->req.in.h.opcode != FUSE_FALLOCATE);
+ BUG_ON(di->size.op != PCS_SIZE_INACTION && di->size.op != PCS_SIZE_GROW);
TRACE("insert ino:%ld->required:%lld r(%p)->required:%lld\n", r->req.io_inode->i_ino,
di->size.required, r, required);
- r->exec.size.required = required;
- r->exec.size.waiting = 1;
- list_add_tail(&r->exec.ireq.list, &di->size.grow_queue);
+ r->exec.size_required = required;
- if (!di->size.required)
+ if (list_empty(&di->size.queue))
queue_work(pcs_wq, &di->size.work);
+
+ list_add_tail(&r->exec.ireq.list, &di->size.queue);
+
+ di->size.required = max(di->size.required, required);
}
static void wait_shrink(struct pcs_fuse_req *r, struct pcs_dentry_info *di)
{
assert_spin_locked(&di->lock);
- BUG_ON(r->exec.size.waiting);
+ BUG_ON(r->exec.size_required);
/* Writes already blocked via fuse_set_nowrite */
BUG_ON(r->req.in.h.opcode != FUSE_READ);
TRACE("insert ino:%ld r:%p\n", r->req.io_inode->i_ino, r);
- r->exec.size.waiting = 1;
- list_add_tail(&r->exec.ireq.list, &di->size.shrink_queue);
+ list_add_tail(&r->exec.ireq.list, &di->size.queue);
}
/*
@@ -761,7 +757,7 @@ static int pcs_fuse_prep_rw(struct pcs_fuse_req *r)
spin_lock(&di->lock);
/* Deffer all requests if shrink requested to prevent livelock */
- if (di->size.shrink) {
+ if (di->size.op == PCS_SIZE_SHRINK) {
wait_shrink(r, di);
spin_unlock(&di->lock);
return 1;
@@ -968,25 +964,22 @@ static void _pcs_shrink_end(struct fuse_conn *fc, struct fuse_req *req)
kpcs_setattr_end(fc, req);
spin_lock(&di->lock);
- BUG_ON(!di->size.shrink);
+ BUG_ON(di->size.op != PCS_SIZE_SHRINK);
BUG_ON(di->size.required);
- BUG_ON(!list_empty(&di->size.grow_queue));
- list_splice_init(&di->size.shrink_queue, &dispose);
- di->size.shrink = 0;
+ list_splice_init(&di->size.queue, &dispose);
+ di->size.op = PCS_SIZE_INACTION;
spin_unlock(&di->lock);
while (!list_empty(&dispose)) {
struct pcs_int_request* ireq = list_first_entry(&dispose, struct pcs_int_request, list);
struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
- BUG_ON(!r->exec.size.waiting);
- BUG_ON(r->exec.size.granted);
+ BUG_ON(r->exec.size_required);
BUG_ON(r->req.in.h.opcode != FUSE_READ);
TRACE("resubmit %p\n", &r->req);
list_del_init(&ireq->list);
- r->exec.size.waiting = 0;
pcs_fuse_submit(pfc, &r->req, 1);
}
}
@@ -1030,7 +1023,6 @@ static int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bo
struct pcs_fuse_req *r = pcs_req_from_fuse(req);
struct fuse_setattr_in *inarg = (void*) req->in.args[0].value;
struct pcs_dentry_info *di;
- int shrink = 0;
if (!(inarg->valid & FATTR_SIZE))
return 1;
@@ -1038,12 +1030,12 @@ static int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bo
di = pcs_inode_from_fuse(fi);
spin_lock(&di->lock);
if (inarg->size < di->fileinfo.attr.size) {
- BUG_ON(di->size.shrink);
- di->size.shrink = shrink = 1;
+ BUG_ON(di->size.op != PCS_SIZE_INACTION);
+ di->size.op = PCS_SIZE_SHRINK;
}
spin_unlock(&di->lock);
r->end = req->end;
- if (shrink) {
+ if (di->size.op == PCS_SIZE_SHRINK) {
BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
/* wait for aio reads in flight */
inode_dio_wait(req->io_inode);
More information about the Devel
mailing list