[Devel] [PATCH RHEL7 COMMIT] fuse_kio_pcs: implement truncate

Konstantin Khorenko khorenko at virtuozzo.com
Mon Feb 19 14:22:34 MSK 2018


The commit is pushed to "branch-rh7-3.10.0-693.17.1.vz7.45.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-693.17.1.vz7.43.7
------>
commit 8bfe46448fb9b35444e8cc7963b472459039758d
Author: Dmitry Monakhov <dmonakhov at openvz.org>
Date:   Mon Feb 19 14:22:34 2018 +0300

    fuse_kio_pcs: implement truncate
    
    Idea of truncate implementation is fairly simple
    grow: (similar to userspace)
     1) all writes beyond i_size are queued to inode's grow_queue,
     2) update size via userspace
     3) resubmit pended writes
    
    shrink:
     1) stop all IO to inode
        a) writes are blocked by i_mutex
        b) reads are queued to shrink_queue
     2) update size via userspace
     3) resubmit pended reads
    
    TODO: May be it is simpler force read requests to grab
          i_mutex if shrink is in progress to avoid queuing
    
    https://jira.sw.ru/browse/PSBM-80680
    Signed-off-by: Dmitry Monakhov <dmonakhov at openvz.org>
---
 fs/fuse/dir.c                      |   2 +
 fs/fuse/kio/pcs/fuse_io.c          |   2 +
 fs/fuse/kio/pcs/pcs_client_types.h |   8 +
 fs/fuse/kio/pcs/pcs_cluster.h      |   7 +
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 318 +++++++++++++++++++++++++++++++++----
 fs/fuse/kio/pcs/pcs_map.h          |   5 +
 6 files changed, 315 insertions(+), 27 deletions(-)

diff --git a/fs/fuse/dir.c b/fs/fuse/dir.c
index d4427c56012b..0ae0344be3c5 100644
--- a/fs/fuse/dir.c
+++ b/fs/fuse/dir.c
@@ -977,6 +977,7 @@ static int fuse_do_getattr(struct inode *inode, struct kstat *stat,
 	req->in.args[0].size = sizeof(inarg);
 	req->in.args[0].value = &inarg;
 	req->out.numargs = 1;
+	req->io_inode = inode;
 	if (fc->minor < 9)
 		req->out.args[0].size = FUSE_COMPAT_ATTR_OUT_SIZE;
 	else
@@ -1639,6 +1640,7 @@ void fuse_set_nowrite(struct inode *inode)
 	BUG_ON(fi->writectr < 0);
 	fi->writectr += FUSE_NOWRITE;
 	spin_unlock(&fc->lock);
+	inode_dio_wait(inode);
 	wait_event(fi->page_waitq, fi->writectr == FUSE_NOWRITE);
 }
 
diff --git a/fs/fuse/kio/pcs/fuse_io.c b/fs/fuse/kio/pcs/fuse_io.c
index c9eaa8d453db..5884fe25a20f 100644
--- a/fs/fuse/kio/pcs/fuse_io.c
+++ b/fs/fuse/kio/pcs/fuse_io.c
@@ -37,6 +37,7 @@ static void on_read_done(struct pcs_fuse_req *r, size_t size)
 
 	DTRACE("do fuse_request_end req:%p op:%d err:%d\n", &r->req, r->req.in.h.opcode, r->req.out.h.error);
 	r->req.out.args[0].size = size;
+	inode_dio_end(r->req.io_inode);
 	request_end(pfc->fc, &r->req);
 }
 
@@ -56,6 +57,7 @@ static void on_write_done(struct pcs_fuse_req *r, off_t pos, size_t size)
 	out->size = size;
 
 	DTRACE("do fuse_request_end req:%p op:%d err:%d\n", &r->req, r->req.in.h.opcode, r->req.out.h.error);
+	inode_dio_end(r->req.io_inode);
 	request_end(pfc->fc, &r->req);
 }
 
diff --git a/fs/fuse/kio/pcs/pcs_client_types.h b/fs/fuse/kio/pcs/pcs_client_types.h
index 3bffd4992221..0be1caff7b46 100644
--- a/fs/fuse/kio/pcs/pcs_client_types.h
+++ b/fs/fuse/kio/pcs/pcs_client_types.h
@@ -53,6 +53,14 @@ struct pcs_dentry_info {
 	PCS_FILETIME_T		local_mtime;
 	struct pcs_mapping	mapping;
 	struct pcs_cluster_core	*cluster;
+	spinlock_t		lock;
+	struct {
+		struct work_struct	work;
+		unsigned long long	shrink;
+		unsigned long long	required;
+		struct list_head	grow_queue;
+		struct list_head	shrink_queue;
+	} size;
 	struct fuse_inode	*inode;
 };
 
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 3a8116b705df..8b58d3cd946b 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -8,6 +8,12 @@ struct fuse_conn;
 /* Try to follows pcs/client/fused structure style */
 struct pcs_fuse_exec_ctx {
 	struct pcs_int_request	ireq;
+	/* The file size control block */
+	struct {
+		unsigned long long	required;
+		unsigned char		granted;
+		unsigned char		waiting;
+	} size;
 	struct {
 		pcs_api_iorequest_t	req;
 		struct bio_vec		*bvec;
@@ -24,6 +30,7 @@ struct pcs_fuse_exec_ctx {
 
 struct pcs_fuse_req {
 	struct fuse_req req;
+	void (*end)(struct fuse_conn *, struct fuse_req *);
 	struct pcs_fuse_exec_ctx exec;	/* Zero initialized context */
 };
 
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 1b21fc9f9ffe..29d62faa8612 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -236,6 +236,7 @@ static int fuse_pcs_kdirect_claim_op(struct fuse_conn *fc, struct file *file,
 	fuse_put_request(fc, req);
 	return err;
 }
+static void  fuse_size_grow_work(struct work_struct *w);
 
 static int kpcs_do_file_open(struct fuse_conn *fc, struct file *file, struct inode *inode)
 {
@@ -263,6 +264,11 @@ static int kpcs_do_file_open(struct fuse_conn *fc, struct file *file, struct ino
 	/* di.id.name.data  = name; */
 	/* di.id.name.len   = id->name.len; */
 
+	spin_lock_init(&di->lock);
+	INIT_LIST_HEAD(&di->size.grow_queue);
+	INIT_LIST_HEAD(&di->size.shrink_queue);
+	INIT_WORK(&di->size.work, fuse_size_grow_work);
+
 	pcs_mapping_init(&pfc->cc, &di->mapping);
 	pcs_set_fileinfo(di, &info);
 	di->cluster = &pfc->cc;
@@ -317,6 +323,8 @@ void kpcs_inode_release(struct fuse_inode *fi)
 	if(!di)
 		return;
 
+	BUG_ON(!list_empty(&di->size.grow_queue));
+	BUG_ON(!list_empty(&di->size.shrink_queue));
 	pcs_mapping_invalidate(&di->mapping);
 	pcs_mapping_deinit(&di->mapping);
 	/* TODO: properly destroy dentry info here!! */
@@ -573,6 +581,178 @@ void ireq_destroy(struct pcs_int_request *ireq)
 	kmem_cache_free(pcs_ireq_cachep, ireq);
 }
 
+static int submit_size_grow(struct inode *inode, unsigned long long size)
+{
+	struct fuse_conn *fc = get_fuse_conn(inode);
+	struct fuse_setattr_in inarg;
+	struct fuse_attr_out outarg;
+	struct fuse_req *req;
+	int err;
+
+	/* Caller comes here w/o i_mutex, but vfs_truncate is blocked
+	   at inode_dio_wait() see fuse_set_nowrite
+	 */
+	BUG_ON(!atomic_read(&inode->i_dio_count));
+
+	TRACE("ino:%ld size:%lld \n",inode->i_ino, size);
+
+	req = fuse_get_req_nopages(fc);
+	if (IS_ERR(req))
+		return PTR_ERR(req);
+
+	memset(&inarg, 0, sizeof(inarg));
+	memset(&outarg, 0, sizeof(outarg));
+
+	inarg.valid |= FATTR_SIZE;
+	inarg.size = size;
+
+	req->io_inode = inode;
+	req->in.h.opcode = FUSE_SETATTR;
+	req->in.h.nodeid = get_node_id(inode);
+	req->in.numargs = 1;
+	req->in.args[0].size = sizeof(inarg);
+	req->in.args[0].value = &inarg;
+	req->out.numargs = 1;
+	req->out.args[0].size = sizeof(outarg);
+	req->out.args[0].value = &outarg;
+
+	fuse_request_send(fc, req);
+	err = req->out.h.error;
+	fuse_put_request(fc, req);
+
+	return err;
+
+}
+
+static void fuse_size_grow_work(struct work_struct *w)
+{
+	struct pcs_dentry_info* di = container_of(w, struct pcs_dentry_info, size.work);
+	struct inode *inode = &di->inode->inode;
+	struct pcs_int_request* ireq, *next;
+	unsigned long long size = 0;
+	LIST_HEAD(to_submit);
+
+	spin_lock(&di->lock);
+	BUG_ON(di->size.shrink);
+	if (di->size.required) {
+		spin_unlock(&di->lock);
+		return;
+	}
+	list_for_each_entry(ireq, &di->size.grow_queue, list) {
+		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
+
+		TRACE("ino:%ld r(%p)->size:%lld	 required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+
+		BUG_ON(!r->exec.size.required);
+		if (size < r->exec.size.required)
+			size = r->exec.size.required;
+	}
+	di->size.required = size;
+	spin_unlock(&di->lock);
+	submit_size_grow(inode, size);
+
+	spin_lock(&di->lock);
+	BUG_ON(di->size.shrink);
+	BUG_ON(di->size.required != size);
+	list_for_each_entry_safe(ireq, next, &di->size.grow_queue, list) {
+		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
+
+		BUG_ON(!r->exec.size.required);
+		BUG_ON(!r->exec.size.waiting);
+		BUG_ON(r->exec.size.granted);
+		if (size >= r->exec.size.required) {
+			TRACE("resubmit ino:%ld r(%p)->size:%lld  required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+
+			r->exec.size.waiting = 0;
+			r->exec.size.granted = 1;
+			list_move(&ireq->list, &to_submit);
+		}
+	}
+	di->size.required = 0;
+	if (!list_empty(&di->size.grow_queue))
+		queue_work(pcs_wq, &di->size.work);
+	spin_unlock(&di->lock);
+
+	pcs_cc_requeue(di->cluster, &to_submit);
+}
+
+static void wait_grow(struct pcs_fuse_req *r, struct pcs_dentry_info *di, unsigned long long required)
+{
+	assert_spin_locked(&di->lock);
+	BUG_ON(r->exec.size.waiting);
+	BUG_ON(r->req.in.h.opcode != FUSE_WRITE);
+
+	TRACE("insert ino:%ld->required:%lld r(%p)->required:%lld\n", r->req.io_inode->i_ino,
+	      di->size.required, r, required);
+	r->exec.size.required = required;
+	r->exec.size.waiting = 1;
+	list_add_tail(&r->exec.ireq.list, &di->size.grow_queue);
+
+	if (!di->size.required)
+		queue_work(pcs_wq, &di->size.work);
+}
+static void wait_shrink(struct pcs_fuse_req *r, struct pcs_dentry_info *di)
+{
+	assert_spin_locked(&di->lock);
+	BUG_ON(r->exec.size.waiting);
+	/* Writes already blocked via fuse_set_nowrite */
+	BUG_ON(r->req.in.h.opcode != FUSE_READ);
+
+	TRACE("insert ino:%ld r:%p\n", r->req.io_inode->i_ino, r);
+	r->exec.size.waiting = 1;
+	list_add_tail(&r->exec.ireq.list, &di->size.shrink_queue);
+}
+
+/*
+ * Check i size boundary and deffer request if necessary
+ * Ret code
+ * 0: ready for submission
+ * -1: should fail request
+ * 1: request placed to pended queue
+*/
+static int pcs_fuse_prep_rw(struct pcs_fuse_req *r)
+{
+	struct fuse_inode *fi = get_fuse_inode(r->req.io_inode);
+	struct pcs_dentry_info *di = pcs_inode_from_fuse(fi);
+	int ret = 0;
+
+	spin_lock(&di->lock);
+	/* Deffer all requests if shrink requested to prevent livelock */
+	if (di->size.shrink) {
+		wait_shrink(r, di);
+		spin_unlock(&di->lock);
+		return 1;
+	}
+	if (r->req.in.h.opcode == FUSE_READ) {
+		size_t size;
+		struct fuse_read_in *in = &r->req.misc.read.in;
+
+		size = in->size;
+		if (in->offset + in->size > di->fileinfo.attr.size) {
+			if (in->offset >= di->fileinfo.attr.size) {
+				r->req.out.args[0].size = 0;
+				spin_unlock(&di->lock);
+				return -1;
+			}
+			size = di->fileinfo.attr.size - in->offset;
+		}
+		pcs_fuse_prep_io(r, PCS_REQ_T_READ, in->offset, size);
+	} else {
+		struct fuse_write_in *in = &r->req.misc.write.in;
+
+		if (in->offset + in->size > di->fileinfo.attr.size) {
+			wait_grow(r, di, in->offset + in->size);
+			ret = 1;
+		}
+		pcs_fuse_prep_io(r, PCS_REQ_T_WRITE, in->offset, in->size);
+
+	}
+	inode_dio_begin(r->req.io_inode);
+	spin_unlock(&di->lock);
+
+	return ret;
+}
+
 static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req, int async)
 {
 	struct pcs_fuse_req *r = pcs_req_from_fuse(req);
@@ -584,32 +764,23 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req,
 	BUG_ON(req->cache != pcs_fuse_req_cachep);
 
 	/* Init pcs_fuse_req */
-	memset(&r->exec.io, 0, sizeof(r->exec.io));
-	memset(&r->exec.ctl, 0, sizeof(r->exec.ctl));
+	memset(&r->exec, 0, sizeof(r->exec));
 	/* Use inline request structure */
 	ireq = &r->exec.ireq;
 	ireq_init(di, ireq);
 
 	switch (r->req.in.h.opcode) {
-	case FUSE_WRITE: {
-		struct fuse_write_in *in = &r->req.misc.write.in;
-
-		pcs_fuse_prep_io(r, PCS_REQ_T_WRITE, in->offset, in->size);
-		goto submit;
-	}
+	case FUSE_WRITE:
 	case FUSE_READ: {
-		struct fuse_read_in *in = &r->req.misc.read.in;
-		size_t size = in->size;
-
-		if (in->offset + in->size > di->fileinfo.attr.size) {
-			if (in->offset >= di->fileinfo.attr.size) {
-				req->out.args[0].size = 0;
-				break;
-			}
-			size = di->fileinfo.attr.size - in->offset;
-		}
-		pcs_fuse_prep_io(r, PCS_REQ_T_READ, in->offset, size);
-		goto submit;
+		int ret;
+
+		ret = pcs_fuse_prep_rw(r);
+		if (!ret)
+			goto submit;
+		if (ret > 0)
+			/* Pended, nothing to do. */
+			return;
+		break;
 	}
 	case FUSE_FSYNC:
 		pcs_fuse_prep_io(r, PCS_REQ_T_SYNC, 0, 0);
@@ -627,12 +798,69 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req,
 		ireq_process(ireq);
 }
 
+static void kpcs_setattr_end(struct fuse_conn *fc, struct fuse_req *req)
+{
+	struct pcs_fuse_req *r = pcs_req_from_fuse(req);
+	struct fuse_inode *fi = get_fuse_inode(req->io_inode);
+	struct fuse_attr_out *outarg = (void*) req->out.args[0].value;
+	struct pcs_dentry_info *di = fi->private;
+
+	BUG_ON(req->in.h.opcode != FUSE_SETATTR);
+	BUG_ON(!di);
+	di = pcs_inode_from_fuse(fi);
+	spin_lock(&di->lock);
+	TRACE("update size: ino:%lu old_sz:%lld new:%lld\n",req->io_inode->i_ino,
+	      di->fileinfo.attr.size, outarg->attr.size);
+
+	if (!req->out.h.error)
+		di->fileinfo.attr.size = outarg->attr.size;
+	spin_unlock(&di->lock);
+	if(r->end)
+		r->end(fc, req);
+}
 
-int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
+static void _pcs_shrink_end(struct fuse_conn *fc, struct fuse_req *req)
 {
 	struct pcs_fuse_cluster *pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
 	struct fuse_inode *fi = get_fuse_inode(req->io_inode);
+	struct pcs_dentry_info *di = fi->private;
+	LIST_HEAD(dispose);
+
+	kpcs_setattr_end(fc, req);
+
+	spin_lock(&di->lock);
+	BUG_ON(!di->size.shrink);
+	BUG_ON(di->size.required);
+	BUG_ON(!list_empty(&di->size.grow_queue));
+
+	list_splice_init(&di->size.shrink_queue, &dispose);
+	di->size.shrink = 0;
+	spin_unlock(&di->lock);
 
+	while (!list_empty(&dispose)) {
+		struct pcs_int_request* ireq = list_first_entry(&dispose, struct pcs_int_request, list);
+		struct pcs_fuse_req *r = container_of(ireq, struct pcs_fuse_req, exec.ireq);
+
+		BUG_ON(!r->exec.size.waiting);
+		BUG_ON(r->exec.size.granted);
+		BUG_ON(r->req.in.h.opcode != FUSE_READ);
+
+		TRACE("resubmit %p\n", &r->req);
+		list_del_init(&ireq->list);
+		r->exec.size.waiting = 0;
+		pcs_fuse_submit(pfc, &r->req, 1);
+	}
+}
+
+static void _pcs_grow_end(struct fuse_conn *fc, struct fuse_req *req)
+{
+	kpcs_setattr_end(fc, req);
+}
+
+static int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
+{
+	struct pcs_fuse_cluster *pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
+	struct fuse_inode *fi = get_fuse_inode(req->io_inode);
 	if (!fc->initialized || fc->conn_error)
 		return 1;
 
@@ -643,7 +871,7 @@ int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
 	 */
 	BUG_ON(!list_empty(&req->list));
 
-	TRACE(" Enter req:%p op:%d bg:%d lk:%d\n", req, req->in.h.opcode, bg, lk);
+	TRACE(" Enter req:%p op:%d end:%p bg:%d lk:%d\n", req, req->in.h.opcode, req->end, bg, lk);
 
 	/* TODO: This is just a crunch, Conn cleanup requires sane locking */
 	if (req->in.h.opcode == FUSE_DESTROY) {
@@ -653,13 +881,49 @@ int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
 		spin_unlock(&fc->lock);
 		return 1;
 	}
-	if ((req->in.h.opcode != FUSE_READ &&
-	     req->in.h.opcode != FUSE_WRITE))
-		return 1;
+	switch (req->in.h.opcode) {
+	case FUSE_SETATTR: {
+		struct pcs_fuse_req *r = pcs_req_from_fuse(req);
+		struct fuse_setattr_in *inarg = (void*) req->in.args[0].value;
+		struct pcs_dentry_info *di = pcs_inode_from_fuse(fi);
+		int shrink = 0;
+
+		if (!(inarg->valid & FATTR_SIZE))
+			return 1;
+
 
-	fi = get_fuse_inode(req->io_inode);
-	if (!fi->private)
+		spin_lock(&di->lock);
+		if (inarg->size < di->fileinfo.attr.size) {
+			BUG_ON(di->size.shrink);
+			di->size.shrink = shrink = 1;
+		}
+		spin_unlock(&di->lock);
+		r->end = req->end;
+		if (shrink) {
+			BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
+			/* wait for aio reads in flight */
+			inode_dio_wait(req->io_inode);
+			/*
+			 * Writebackcache was flushed already so it is safe to
+			 * drop pcs_mapping
+			 */
+			pcs_map_invalidate_tail(&di->mapping, inarg->size);
+			req->end = _pcs_shrink_end;
+		} else
+			req->end = _pcs_grow_end;
+		return 1;
+	}
+	case FUSE_READ:
+	case FUSE_WRITE:
+		fi = get_fuse_inode(req->io_inode);
+		if (!fi->private)
+			return 1;
+
+		break;
+	default:
 		return 1;
+	}
+
 
 	__clear_bit(FR_BACKGROUND, &req->flags);
 	__clear_bit(FR_PENDING, &req->flags);
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index 754e0f177d46..11176b2b80d5 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -260,5 +260,10 @@ static inline struct pcs_map_entry *pcs_map_get(struct pcs_map_entry *m)
 	return m;
 }
 
+static inline void pcs_map_invalidate_tail(struct pcs_mapping * mapping, u64 offset)
+{
+	unsigned long index = offset >> mapping->chunk_size_bits;
 
+	map_truncate_tail(mapping, index << mapping->chunk_size_bits);
+}
 #endif /* _PCS_MAP_H_ */


More information about the Devel mailing list