[Devel] [PATCH RHEL7 COMMIT] fuse_kio_pcs: implement truncate
Konstantin Khorenko
khorenko at virtuozzo.com
Mon Feb 19 14:22:34 MSK 2018
The commit is pushed to "branch-rh7-3.10.0-693.17.1.vz7.45.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-693.17.1.vz7.43.7
------>
commit 8bfe46448fb9b35444e8cc7963b472459039758d
Author: Dmitry Monakhov <dmonakhov at openvz.org>
Date: Mon Feb 19 14:22:34 2018 +0300
fuse_kio_pcs: implement truncate
Idea of truncate implementation is fairly simple
grow: (similar to userspace)
1) all writes beyond i_size are queued to inode's grow_queue,
2) update size via userspace
3) resubmit pended writes
shrink:
1) stop all IO to inode
a) writes are blocked by i_mutex
b) reads are queued to shrink_queue
2) update size via userspace
3) resubmit pended reads
TODO: May be it is simpler force read requests to grab
i_mutex if shrink is in progress to avoid queuing
https://jira.sw.ru/browse/PSBM-80680
Signed-off-by: Dmitry Monakhov <dmonakhov at openvz.org>
---
fs/fuse/dir.c | 2 +
fs/fuse/kio/pcs/fuse_io.c | 2 +
fs/fuse/kio/pcs/pcs_client_types.h | 8 +
fs/fuse/kio/pcs/pcs_cluster.h | 7 +
fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 318 +++++++++++++++++++++++++++++++++----
fs/fuse/kio/pcs/pcs_map.h | 5 +
6 files changed, 315 insertions(+), 27 deletions(-)
diff --git a/fs/fuse/dir.c b/fs/fuse/dir.c
index d4427c56012b..0ae0344be3c5 100644
--- a/fs/fuse/dir.c
+++ b/fs/fuse/dir.c
@@ -977,6 +977,7 @@ static int fuse_do_getattr(struct inode *inode, struct kstat *stat,
req->in.args[0].size = sizeof(inarg);
req->in.args[0].value = &inarg;
req->out.numargs = 1;
+ req->io_inode = inode;
if (fc->minor < 9)
req->out.args[0].size = FUSE_COMPAT_ATTR_OUT_SIZE;
else
@@ -1639,6 +1640,7 @@ void fuse_set_nowrite(struct inode *inode)
BUG_ON(fi->writectr < 0);
fi->writectr += FUSE_NOWRITE;
spin_unlock(&fc->lock);
+ inode_dio_wait(inode);
wait_event(fi->page_waitq, fi->writectr == FUSE_NOWRITE);
}
diff --git a/fs/fuse/kio/pcs/fuse_io.c b/fs/fuse/kio/pcs/fuse_io.c
index c9eaa8d453db..5884fe25a20f 100644
--- a/fs/fuse/kio/pcs/fuse_io.c
+++ b/fs/fuse/kio/pcs/fuse_io.c
@@ -37,6 +37,7 @@ static void on_read_done(struct pcs_fuse_req *r, size_t size)
DTRACE("do fuse_request_end req:%p op:%d err:%d\n", &r->req, r->req.in.h.opcode, r->req.out.h.error);
r->req.out.args[0].size = size;
+ inode_dio_end(r->req.io_inode);
request_end(pfc->fc, &r->req);
}
@@ -56,6 +57,7 @@ static void on_write_done(struct pcs_fuse_req *r, off_t pos, size_t size)
out->size = size;
DTRACE("do fuse_request_end req:%p op:%d err:%d\n", &r->req, r->req.in.h.opcode, r->req.out.h.error);
+ inode_dio_end(r->req.io_inode);
request_end(pfc->fc, &r->req);
}
diff --git a/fs/fuse/kio/pcs/pcs_client_types.h b/fs/fuse/kio/pcs/pcs_client_types.h
index 3bffd4992221..0be1caff7b46 100644
--- a/fs/fuse/kio/pcs/pcs_client_types.h
+++ b/fs/fuse/kio/pcs/pcs_client_types.h
@@ -53,6 +53,14 @@ struct pcs_dentry_info {
PCS_FILETIME_T local_mtime;
struct pcs_mapping mapping;
struct pcs_cluster_core *cluster;
+ spinlock_t lock;
+ struct {
+ struct work_struct work;
+ unsigned long long shrink;
+ unsigned long long required;
+ struct list_head grow_queue;
+ struct list_head shrink_queue;
+ } size;
struct fuse_inode *inode;
};
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 3a8116b705df..8b58d3cd946b 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -8,6 +8,12 @@ struct fuse_conn;
/* Try to follows pcs/client/fused structure style */
struct pcs_fuse_exec_ctx {
struct pcs_int_request ireq;
+ /* The file size control block */
+ struct {
+ unsigned long long required;
+ unsigned char granted;
+ unsigned char waiting;
+ } size;
struct {
pcs_api_iorequest_t req;
struct bio_vec *bvec;
@@ -24,6 +30,7 @@ struct pcs_fuse_exec_ctx {
struct pcs_fuse_req {
struct fuse_req req;
+ void (*end)(struct fuse_conn *, struct fuse_req *);
struct pcs_fuse_exec_ctx exec; /* Zero initialized context */
};
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 1b21fc9f9ffe..29d62faa8612 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -236,6 +236,7 @@ static int fuse_pcs_kdirect_claim_op(struct fuse_conn *fc, struct file *file,
fuse_put_request(fc, req);
return err;
}
+static void fuse_size_grow_work(struct work_struct *w);
static int kpcs_do_file_open(struct fuse_conn *fc, struct file *file, struct inode *inode)
{
@@ -263,6 +264,11 @@ static int kpcs_do_file_open(struct fuse_conn *fc, struct file *file, struct ino
/* di.id.name.data = name; */
/* di.id.name.len = id->name.len; */
+ spin_lock_init(&di->lock);
+ INIT_LIST_HEAD(&di->size.grow_queue);
+ INIT_LIST_HEAD(&di->size.shrink_queue);
+ INIT_WORK(&di->size.work, fuse_size_grow_work);
+
pcs_mapping_init(&pfc->cc, &di->mapping);
pcs_set_fileinfo(di, &info);
di->cluster = &pfc->cc;
@@ -317,6 +323,8 @@ void kpcs_inode_release(struct fuse_inode *fi)
if(!di)
return;
+ BUG_ON(!list_empty(&di->size.grow_queue));
+ BUG_ON(!list_empty(&di->size.shrink_queue));
pcs_mapping_invalidate(&di->mapping);
pcs_mapping_deinit(&di->mapping);
/* TODO: properly destroy dentry info here!! */
@@ -573,6 +581,178 @@ void ireq_destroy(struct pcs_int_request *ireq)
kmem_cache_free(pcs_ireq_cachep, ireq);
}
+static int submit_size_grow(struct inode *inode, unsigned long long size)
+{
+ struct fuse_conn *fc = get_fuse_conn(inode);
+ struct fuse_setattr_in inarg;
+ struct fuse_attr_out outarg;
+ struct fuse_req *req;
+ int err;
+
+ /* Caller comes here w/o i_mutex, but vfs_truncate is blocked
+ at inode_dio_wait() see fuse_set_nowrite
+ */
+ BUG_ON(!atomic_read(&inode->i_dio_count));
+
+ TRACE("ino:%ld size:%lld \n",inode->i_ino, size);
+
+ req = fuse_get_req_nopages(fc);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ memset(&inarg, 0, sizeof(inarg));
+ memset(&outarg, 0, sizeof(outarg));
+
+ inarg.valid |= FATTR_SIZE;
+ inarg.size = size;
+
+ req->io_inode = inode;
+ req->in.h.opcode = FUSE_SETATTR;
+ req->in.h.nodeid = get_node_id(inode);
+ req->in.numargs = 1;
+ req->in.args[0].size = sizeof(inarg);
+ req->in.args[0].value = &inarg;
+ req->out.numargs = 1;
+ req->out.args[0].size = sizeof(outarg);
+ req->out.args[0].value = &outarg;
+
+ fuse_request_send(fc, req);
+ err = req->out.h.error;
+ fuse_put_request(fc, req);
+
+ return err;
+
+}
+
+static void fuse_size_grow_work(struct work_struct *w)
+{
+ struct pcs_dentry_info* di = container_of(w, struct pcs_dentry_info, size.work);
+ struct inode *inode = &di->inode->inode;
+ struct pcs_int_request* ireq, *next;
+ unsigned long long size = 0;
+ LIST_HEAD(to_submit);
+
+ spin_lock(&di->lock);
+ BUG_ON(di->size.shrink);
+ if (di->size.required) {
+ spin_unlock(&di->lock);
+ return;
+ }
+ list_for_each_entry(ireq, &di->size.grow_queue, list) {
+ struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
+
+ TRACE("ino:%ld r(%p)->size:%lld required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+
+ BUG_ON(!r->exec.size.required);
+ if (size < r->exec.size.required)
+ size = r->exec.size.required;
+ }
+ di->size.required = size;
+ spin_unlock(&di->lock);
+ submit_size_grow(inode, size);
+
+ spin_lock(&di->lock);
+ BUG_ON(di->size.shrink);
+ BUG_ON(di->size.required != size);
+ list_for_each_entry_safe(ireq, next, &di->size.grow_queue, list) {
+ struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
+
+ BUG_ON(!r->exec.size.required);
+ BUG_ON(!r->exec.size.waiting);
+ BUG_ON(r->exec.size.granted);
+ if (size >= r->exec.size.required) {
+ TRACE("resubmit ino:%ld r(%p)->size:%lld required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+
+ r->exec.size.waiting = 0;
+ r->exec.size.granted = 1;
+ list_move(&ireq->list, &to_submit);
+ }
+ }
+ di->size.required = 0;
+ if (!list_empty(&di->size.grow_queue))
+ queue_work(pcs_wq, &di->size.work);
+ spin_unlock(&di->lock);
+
+ pcs_cc_requeue(di->cluster, &to_submit);
+}
+
+static void wait_grow(struct pcs_fuse_req *r, struct pcs_dentry_info *di, unsigned long long required)
+{
+ assert_spin_locked(&di->lock);
+ BUG_ON(r->exec.size.waiting);
+ BUG_ON(r->req.in.h.opcode != FUSE_WRITE);
+
+ TRACE("insert ino:%ld->required:%lld r(%p)->required:%lld\n", r->req.io_inode->i_ino,
+ di->size.required, r, required);
+ r->exec.size.required = required;
+ r->exec.size.waiting = 1;
+ list_add_tail(&r->exec.ireq.list, &di->size.grow_queue);
+
+ if (!di->size.required)
+ queue_work(pcs_wq, &di->size.work);
+}
+static void wait_shrink(struct pcs_fuse_req *r, struct pcs_dentry_info *di)
+{
+ assert_spin_locked(&di->lock);
+ BUG_ON(r->exec.size.waiting);
+ /* Writes already blocked via fuse_set_nowrite */
+ BUG_ON(r->req.in.h.opcode != FUSE_READ);
+
+ TRACE("insert ino:%ld r:%p\n", r->req.io_inode->i_ino, r);
+ r->exec.size.waiting = 1;
+ list_add_tail(&r->exec.ireq.list, &di->size.shrink_queue);
+}
+
+/*
+ * Check i size boundary and deffer request if necessary
+ * Ret code
+ * 0: ready for submission
+ * -1: should fail request
+ * 1: request placed to pended queue
+*/
+static int pcs_fuse_prep_rw(struct pcs_fuse_req *r)
+{
+ struct fuse_inode *fi = get_fuse_inode(r->req.io_inode);
+ struct pcs_dentry_info *di = pcs_inode_from_fuse(fi);
+ int ret = 0;
+
+ spin_lock(&di->lock);
+ /* Deffer all requests if shrink requested to prevent livelock */
+ if (di->size.shrink) {
+ wait_shrink(r, di);
+ spin_unlock(&di->lock);
+ return 1;
+ }
+ if (r->req.in.h.opcode == FUSE_READ) {
+ size_t size;
+ struct fuse_read_in *in = &r->req.misc.read.in;
+
+ size = in->size;
+ if (in->offset + in->size > di->fileinfo.attr.size) {
+ if (in->offset >= di->fileinfo.attr.size) {
+ r->req.out.args[0].size = 0;
+ spin_unlock(&di->lock);
+ return -1;
+ }
+ size = di->fileinfo.attr.size - in->offset;
+ }
+ pcs_fuse_prep_io(r, PCS_REQ_T_READ, in->offset, size);
+ } else {
+ struct fuse_write_in *in = &r->req.misc.write.in;
+
+ if (in->offset + in->size > di->fileinfo.attr.size) {
+ wait_grow(r, di, in->offset + in->size);
+ ret = 1;
+ }
+ pcs_fuse_prep_io(r, PCS_REQ_T_WRITE, in->offset, in->size);
+
+ }
+ inode_dio_begin(r->req.io_inode);
+ spin_unlock(&di->lock);
+
+ return ret;
+}
+
static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req, int async)
{
struct pcs_fuse_req *r = pcs_req_from_fuse(req);
@@ -584,32 +764,23 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req,
BUG_ON(req->cache != pcs_fuse_req_cachep);
/* Init pcs_fuse_req */
- memset(&r->exec.io, 0, sizeof(r->exec.io));
- memset(&r->exec.ctl, 0, sizeof(r->exec.ctl));
+ memset(&r->exec, 0, sizeof(r->exec));
/* Use inline request structure */
ireq = &r->exec.ireq;
ireq_init(di, ireq);
switch (r->req.in.h.opcode) {
- case FUSE_WRITE: {
- struct fuse_write_in *in = &r->req.misc.write.in;
-
- pcs_fuse_prep_io(r, PCS_REQ_T_WRITE, in->offset, in->size);
- goto submit;
- }
+ case FUSE_WRITE:
case FUSE_READ: {
- struct fuse_read_in *in = &r->req.misc.read.in;
- size_t size = in->size;
-
- if (in->offset + in->size > di->fileinfo.attr.size) {
- if (in->offset >= di->fileinfo.attr.size) {
- req->out.args[0].size = 0;
- break;
- }
- size = di->fileinfo.attr.size - in->offset;
- }
- pcs_fuse_prep_io(r, PCS_REQ_T_READ, in->offset, size);
- goto submit;
+ int ret;
+
+ ret = pcs_fuse_prep_rw(r);
+ if (!ret)
+ goto submit;
+ if (ret > 0)
+ /* Pended, nothing to do. */
+ return;
+ break;
}
case FUSE_FSYNC:
pcs_fuse_prep_io(r, PCS_REQ_T_SYNC, 0, 0);
@@ -627,12 +798,69 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req,
ireq_process(ireq);
}
+static void kpcs_setattr_end(struct fuse_conn *fc, struct fuse_req *req)
+{
+ struct pcs_fuse_req *r = pcs_req_from_fuse(req);
+ struct fuse_inode *fi = get_fuse_inode(req->io_inode);
+ struct fuse_attr_out *outarg = (void*) req->out.args[0].value;
+ struct pcs_dentry_info *di = fi->private;
+
+ BUG_ON(req->in.h.opcode != FUSE_SETATTR);
+ BUG_ON(!di);
+ di = pcs_inode_from_fuse(fi);
+ spin_lock(&di->lock);
+ TRACE("update size: ino:%lu old_sz:%lld new:%lld\n",req->io_inode->i_ino,
+ di->fileinfo.attr.size, outarg->attr.size);
+
+ if (!req->out.h.error)
+ di->fileinfo.attr.size = outarg->attr.size;
+ spin_unlock(&di->lock);
+ if(r->end)
+ r->end(fc, req);
+}
-int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
+static void _pcs_shrink_end(struct fuse_conn *fc, struct fuse_req *req)
{
struct pcs_fuse_cluster *pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
struct fuse_inode *fi = get_fuse_inode(req->io_inode);
+ struct pcs_dentry_info *di = fi->private;
+ LIST_HEAD(dispose);
+
+ kpcs_setattr_end(fc, req);
+
+ spin_lock(&di->lock);
+ BUG_ON(!di->size.shrink);
+ BUG_ON(di->size.required);
+ BUG_ON(!list_empty(&di->size.grow_queue));
+
+ list_splice_init(&di->size.shrink_queue, &dispose);
+ di->size.shrink = 0;
+ spin_unlock(&di->lock);
+ while (!list_empty(&dispose)) {
+ struct pcs_int_request* ireq = list_first_entry(&dispose, struct pcs_int_request, list);
+ struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
+
+ BUG_ON(!r->exec.size.waiting);
+ BUG_ON(r->exec.size.granted);
+ BUG_ON(r->req.in.h.opcode != FUSE_READ);
+
+ TRACE("resubmit %p\n", &r->req);
+ list_del_init(&ireq->list);
+ r->exec.size.waiting = 0;
+ pcs_fuse_submit(pfc, &r->req, 1);
+ }
+}
+
+static void _pcs_grow_end(struct fuse_conn *fc, struct fuse_req *req)
+{
+ kpcs_setattr_end(fc, req);
+}
+
+static int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
+{
+ struct pcs_fuse_cluster *pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
+ struct fuse_inode *fi = get_fuse_inode(req->io_inode);
if (!fc->initialized || fc->conn_error)
return 1;
@@ -643,7 +871,7 @@ int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
*/
BUG_ON(!list_empty(&req->list));
- TRACE(" Enter req:%p op:%d bg:%d lk:%d\n", req, req->in.h.opcode, bg, lk);
+ TRACE(" Enter req:%p op:%d end:%p bg:%d lk:%d\n", req, req->in.h.opcode, req->end, bg, lk);
/* TODO: This is just a crunch, Conn cleanup requires sane locking */
if (req->in.h.opcode == FUSE_DESTROY) {
@@ -653,13 +881,49 @@ int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
spin_unlock(&fc->lock);
return 1;
}
- if ((req->in.h.opcode != FUSE_READ &&
- req->in.h.opcode != FUSE_WRITE))
- return 1;
+ switch (req->in.h.opcode) {
+ case FUSE_SETATTR: {
+ struct pcs_fuse_req *r = pcs_req_from_fuse(req);
+ struct fuse_setattr_in *inarg = (void*) req->in.args[0].value;
+ struct pcs_dentry_info *di = pcs_inode_from_fuse(fi);
+ int shrink = 0;
+
+ if (!(inarg->valid & FATTR_SIZE))
+ return 1;
+
- fi = get_fuse_inode(req->io_inode);
- if (!fi->private)
+ spin_lock(&di->lock);
+ if (inarg->size < di->fileinfo.attr.size) {
+ BUG_ON(di->size.shrink);
+ di->size.shrink = shrink = 1;
+ }
+ spin_unlock(&di->lock);
+ r->end = req->end;
+ if (shrink) {
+ BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
+ /* wait for aio reads in flight */
+ inode_dio_wait(req->io_inode);
+ /*
+ * Writebackcache was flushed already so it is safe to
+ * drop pcs_mapping
+ */
+ pcs_map_invalidate_tail(&di->mapping, inarg->size);
+ req->end = _pcs_shrink_end;
+ } else
+ req->end = _pcs_grow_end;
+ return 1;
+ }
+ case FUSE_READ:
+ case FUSE_WRITE:
+ fi = get_fuse_inode(req->io_inode);
+ if (!fi->private)
+ return 1;
+
+ break;
+ default:
return 1;
+ }
+
__clear_bit(FR_BACKGROUND, &req->flags);
__clear_bit(FR_PENDING, &req->flags);
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index 754e0f177d46..11176b2b80d5 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -260,5 +260,10 @@ static inline struct pcs_map_entry *pcs_map_get(struct pcs_map_entry *m)
return m;
}
+static inline void pcs_map_invalidate_tail(struct pcs_mapping * mapping, u64 offset)
+{
+ unsigned long index = offset >> mapping->chunk_size_bits;
+ map_truncate_tail(mapping, index << mapping->chunk_size_bits);
+}
#endif /* _PCS_MAP_H_ */
More information about the Devel
mailing list