[Devel] [PATCH RHEL9 COMMIT] fuse: direct access to local cs repositories

Konstantin Khorenko khorenko at virtuozzo.com
Wed Nov 1 22:23:35 MSK 2023


The commit is pushed to "branch-rh9-5.14.0-284.25.1.vz9.30.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh9-5.14.0-284.25.1.vz9.30.8
------>
commit 620a4d6bce88f5f27c8b7912d811bf9e3d89b869
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date:   Fri Oct 6 18:42:28 2023 +0800

    fuse: direct access to local cs repositories
    
    For now, it is used only under limited curcumstances,
    only for reads, only for journalless unencrypted CS. Extentions
    are possible, but not before we see the result.
    The value of this POC case is that it allows to get minimal physically
    possible latency, extensions will result in some degradations,
    which are to be evaluated separately.
    
    The idea is the following. When CS receives read request
    from local kernel client, it installs to kernel a map
    for this file. Subsequent requests are not sent to CS,
    but processed directly from kernel. Uniqueness of this case
    is that it completely avoids not only syscalls and context
    switches to user threads, but also makes exact zero copy.
    
    https://pmc.acronis.work/browse/VSTOR-54040
    
    Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
    
    Feature: vStorage
---
 fs/fuse/Makefile                   |   3 +-
 fs/fuse/dev.c                      |  14 ++
 fs/fuse/fuse_i.h                   |   4 +
 fs/fuse/inode.c                    |   4 +-
 fs/fuse/ioctl.c                    |  13 +
 fs/fuse/kio/pcs/pcs_cluster.c      |   1 +
 fs/fuse/kio/pcs/pcs_cluster.h      |   5 +
 fs/fuse/kio/pcs/pcs_cs.c           |   7 +
 fs/fuse/kio/pcs/pcs_cs.h           |   5 +
 fs/fuse/kio/pcs/pcs_cs_accel.c     | 499 +++++++++++++++++++++++++++++++++++++
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |  85 ++++++-
 fs/fuse/kio/pcs/pcs_ioctl.h        |  27 ++
 fs/fuse/kio/pcs/pcs_prot_types.h   |   1 +
 fs/fuse/kio/pcs/pcs_req.h          |  23 +-
 include/uapi/linux/fuse.h          |  10 +
 15 files changed, 692 insertions(+), 9 deletions(-)

diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index 37ef32ffa13d..afed0047b858 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -32,6 +32,7 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
 	kio/pcs/pcs_rdma_io.o \
 	kio/pcs/pcs_rdma_rw.o \
 	kio/pcs/pcs_rdma_conn.o \
-	kio/pcs/pcs_net_addr.o
+	kio/pcs/pcs_net_addr.o \
+	kio/pcs/pcs_cs_accel.o
 
 virtiofs-y := virtio_fs.o
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 424180ff7ab2..eb3fc44fe324 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2428,6 +2428,20 @@ static long fuse_dev_ioctl(struct file *file, unsigned int cmd,
 			res = 0;
 		}
 		break;
+	case FUSE_IOC_KIO_CALL:
+	{
+		struct fuse_kio_call req;
+		struct fuse_kio_ops *op;
+
+		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+			return -EFAULT;
+		op = fuse_kio_get(NULL, req.name);
+		if (op == NULL)
+			return -EINVAL;
+		res = op->ioctl(NULL, NULL, req.cmd, req.data, req.len);
+		fuse_kio_put(op);
+		break;
+	}
 	default:
 		res = -ENOTTY;
 		break;
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index ce90c8283b97..e3654005abef 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -632,6 +632,7 @@ struct fuse_kio_ops {
 	void (*file_close)(struct file *file, struct inode *inode);
 	void (*inode_release)(struct fuse_inode *fi);
 	void (*kill_requests)(struct fuse_conn *fc, struct inode *inode);
+	int  (*ioctl)(struct file *file, struct inode *inode, unsigned int cmd, unsigned long arg, int len);
 
 };
 int fuse_register_kio(struct fuse_kio_ops *ops);
@@ -1529,4 +1530,7 @@ struct fuse_file *fuse_file_open(struct fuse_mount *fm, u64 nodeid,
 void fuse_file_release(struct inode *inode, struct fuse_file *ff,
 		       unsigned int open_flags, fl_owner_t id, bool isdir);
 
+struct fuse_kio_ops *fuse_kio_get(struct fuse_conn *fc, char *name);
+void fuse_kio_put(struct fuse_kio_ops *ops);
+
 #endif /* _FS_FUSE_I_H */
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index c3777008b318..1eb64b13d508 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -599,7 +599,7 @@ void fuse_unregister_kio(struct fuse_kio_ops *ops)
 }
 EXPORT_SYMBOL_GPL(fuse_unregister_kio);
 
-static struct fuse_kio_ops *fuse_kio_get(struct fuse_conn *fc, char *name)
+struct fuse_kio_ops *fuse_kio_get(struct fuse_conn *fc, char *name)
 {
 	struct fuse_kio_ops *ops;
 	bool once = true;
@@ -621,7 +621,7 @@ static struct fuse_kio_ops *fuse_kio_get(struct fuse_conn *fc, char *name)
 	return NULL;
 }
 
-static void fuse_kio_put(struct fuse_kio_ops *ops)
+void fuse_kio_put(struct fuse_kio_ops *ops)
 {
 	module_put(ops->owner);
 }
diff --git a/fs/fuse/ioctl.c b/fs/fuse/ioctl.c
index ae4a7ac73018..310cd258e8f4 100644
--- a/fs/fuse/ioctl.c
+++ b/fs/fuse/ioctl.c
@@ -343,6 +343,19 @@ long fuse_ioctl_common(struct file *file, unsigned int cmd,
 	if (fuse_is_bad(inode))
 		return -EIO;
 
+	if (cmd == FUSE_IOC_KIO_CALL) {
+		struct fuse_conn *fc = get_fuse_conn(inode);
+		struct fuse_kio_call req;
+
+		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+			return -EFAULT;
+		if (!fc->kio.op || !fc->kio.op->ioctl)
+			return -EINVAL;
+		if (strncmp(req.name, fc->kio.op->name, sizeof(req.name)))
+			return -EINVAL;
+		return fc->kio.op->ioctl(file, inode, req.cmd, req.data, req.len);
+	}
+
 	return fuse_do_ioctl(file, cmd, arg, flags);
 }
 
diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c
index 32a882e2b2c8..dbca68fe1ccb 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.c
+++ b/fs/fuse/kio/pcs/pcs_cluster.c
@@ -604,6 +604,7 @@ int pcs_cluster_init(struct pcs_fuse_cluster *pfc, struct workqueue_struct *wq,
 	attr.node = info->node_id;
 	attr.abort_timeout_ms = 0;
 
+	INIT_LIST_HEAD(&pfc->list);
 	pfc->fc = fc;
 
 	/* core init */
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 31a5830cf30c..797300c0ffca 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -51,6 +51,7 @@ static inline void ireq_retry_inc(struct pcs_int_request *ireq)
 }
 
 struct pcs_fuse_cluster {
+	struct list_head list;
 	struct pcs_cluster_core cc;
 	struct fuse_conn *fc;
 };
@@ -138,4 +139,8 @@ static inline void pcs_cc_set_abort_timeout(struct pcs_cluster_core *cc, int tim
 	cc->cfg.def.abort_timeout = cc->cfg.curr.abort_timeout = timeout;
 }
 
+int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id);
+int pcs_csa_init(void);
+void pcs_csa_fini(void);
+
 #endif /* _PCS_CLUSTER_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 13e2b4e6b50c..7e5070d1ee86 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -609,6 +609,11 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	int storage_version = atomic_read(&ireq->cc->storage_version);
 	int aligned_msg;
 
+	if (ireq->iochunk.cmd == PCS_REQ_T_READ && !((ireq->iochunk.size|ireq->iochunk.offset) & 511)) {
+		if (pcs_csa_cs_submit(cs, ireq))
+			return;
+	}
+
 	msg->private = cs;
 
 	BUG_ON(msg->rpc);
@@ -885,6 +890,7 @@ static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose)
 	spin_unlock(&cs->css->lock);
 
 	pcs_cs_truncate_maps(cs);
+	pcs_csa_cs_detach(cs);
 
 	BUG_ON(cs->nmaps);
 
@@ -905,6 +911,7 @@ static void pcs_cs_destroy(struct pcs_cs *cs)
 	BUG_ON(!list_empty(&cs->active_list));
 	BUG_ON(!list_empty(&cs->cong_queue));
 	BUG_ON(!cs->is_dead);
+	BUG_ON(cs->csa_ctx);
 
 	if (cs->rpc) {
 		pcs_rpc_close(cs->rpc);
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index b7fe63a7a3d8..0564ef33514d 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -97,6 +97,8 @@ struct pcs_cs {
 	int			nmaps;
 	struct list_head	map_list;
 
+	struct pcs_csa_context	*csa_ctx;
+
 	struct {
 		struct fuse_lat_stat __percpu *iolat;
 		struct fuse_lat_stat __percpu *netlat;
@@ -211,4 +213,7 @@ void pcs_cs_set_stat_up(struct pcs_cs_set *set);
 u32 pcs_cs_msg_size(u32 size, u32 storage_version);
 struct pcs_msg* pcs_alloc_cs_msg(u32 type, u32 size, u32 storage_version);
 
+int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq);
+void pcs_csa_cs_detach(struct pcs_cs * cs);
+
 #endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
new file mode 100644
index 000000000000..9444f751fe10
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -0,0 +1,499 @@
+#include <linux/types.h>
+#include <linux/file.h>
+#include <linux/rbtree.h>
+#include <linux/highmem.h>
+#include <linux/log2.h>
+#include <linux/module.h>
+#include <linux/anon_inodes.h>
+
+#include "pcs_types.h"
+#include "pcs_sock_io.h"
+#include "pcs_rpc.h"
+#include "pcs_sock_io.h"
+#include "pcs_req.h"
+#include "pcs_map.h"
+#include "pcs_cs.h"
+#include "pcs_ioctl.h"
+#include "pcs_cluster.h"
+#include "log.h"
+#include "fuse_ktrace.h"
+
+/* CSA context can be referenced from two places:
+ *  * csaccel file struct as filp->private_data
+ *    This reference is dropped at csaccel file close
+ *  * struct cs as cs->csa_ctx
+ *    This reference is dropped at unmount
+ *
+ * CSA entries can be referenced only from radix tree at corresponding CSA.
+ * No reference counting is done, releases are done through RCU cycle.
+ *
+ * Tricky part which could be done nice. ctx->cs is protected with cs->lock and rcu.
+ * So we dereference ctx->cs, lock cs and check that it is still the same afterwards.
+ */
+
+struct kmem_cache *pcs_csa_cachep;
+
+struct pcs_csa_context
+{
+	struct rcu_head		rcu;
+	struct pcs_cs		*cs;  /* The reference accounted in cs->nmaps */
+	atomic_t		refcnt;
+	int			dead;
+	spinlock_t		lock;
+	wait_queue_head_t	wqh;
+	struct radix_tree_root  tree; /* GFP_ATOMIC */
+};
+
+struct pcs_csa_entry
+{
+	struct rcu_head		rcu;
+	PCS_CHUNK_UID_T		chunk_id;
+	PCS_MAP_VERSION_T	version;
+	unsigned int		flags;
+	int			dead;
+	struct file		*file;
+};
+
+static inline void __cse_destroy(struct pcs_csa_entry * cse)
+{
+	if (cse->file) {
+		fput(cse->file);
+		cse->file = NULL;
+	}
+	kmem_cache_free(pcs_csa_cachep, cse);
+}
+
+static void cse_destroy_rcu(struct rcu_head *head)
+{
+	struct pcs_csa_entry * cse = container_of(head, struct pcs_csa_entry, rcu);
+	__cse_destroy(cse);
+}
+
+static void csa_clear_tree(struct pcs_csa_context *ctx)
+{
+#define BATCH_SIZE 16
+	struct pcs_csa_entry *cse_buf[BATCH_SIZE];
+	int nr;
+	u64 pos = 0;
+
+	do {
+		int i;
+
+		spin_lock(&ctx->lock);
+		nr = radix_tree_gang_lookup(&ctx->tree, (void **)cse_buf, pos, BATCH_SIZE);
+
+		for (i = 0; i < nr; i++) {
+			struct pcs_csa_entry * cse = cse_buf[i];
+			pos = cse->chunk_id;
+			radix_tree_delete(&ctx->tree, cse->chunk_id);
+			call_rcu(&cse->rcu, cse_destroy_rcu);
+		}
+		spin_unlock(&ctx->lock);
+		pos++;
+	} while (nr);
+}
+
+static void csa_destroy_rcu(struct rcu_head *head)
+{
+	struct pcs_csa_context * ctx = container_of(head, struct pcs_csa_context, rcu);
+	BUG_ON(!ctx->dead);
+	csa_clear_tree(ctx);
+}
+
+static inline void pcs_csa_put(struct pcs_csa_context * ctx)
+{
+	if (atomic_dec_and_test(&ctx->refcnt))
+		call_rcu(&ctx->rcu, csa_destroy_rcu);
+}
+
+static inline void __pcs_csa_put(struct pcs_csa_context * ctx)
+{
+	if (atomic_dec_and_test(&ctx->refcnt))
+		BUG();
+}
+
+void pcs_csa_cs_detach(struct pcs_cs * cs)
+{
+	struct pcs_csa_context * csa_ctx;
+
+	assert_spin_locked(&cs->lock);
+
+	if ((csa_ctx = cs->csa_ctx) != NULL) {
+		csa_ctx->cs = NULL;
+		cs->nmaps--;
+		cs->csa_ctx = NULL;
+		csa_ctx->dead = 1;
+		wake_up_poll(&csa_ctx->wqh, EPOLLHUP);
+		pcs_csa_put(csa_ctx);
+	}
+}
+
+static inline struct pcs_csa_entry * cse_lookup(struct pcs_csa_context * ctx, u64 chunk_id)
+{
+	struct pcs_csa_entry * cse;
+
+	rcu_read_lock();
+	cse= radix_tree_lookup(&ctx->tree, chunk_id);
+	rcu_read_unlock();
+	return cse;
+}
+
+static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, u32 flags, PCS_MAP_VERSION_T * vers,
+		      struct file * file)
+{
+	struct pcs_csa_entry * csa, * csb;
+
+	if (file == NULL) {
+		spin_lock(&ctx->lock);
+		csa = radix_tree_lookup(&ctx->tree, chunk_id);
+		if (csa) {
+			void * ret;
+			ret = radix_tree_delete(&ctx->tree, chunk_id);
+			BUG_ON(!ret || ret != csa);
+			csa->dead = 1;
+			call_rcu(&csa->rcu, cse_destroy_rcu);
+		}
+		spin_unlock(&ctx->lock);
+		return 0;
+	}
+
+	csb = kmem_cache_zalloc(pcs_csa_cachep, GFP_NOIO);
+	if (!csb)
+		return -ENOMEM;
+
+	csb->chunk_id = chunk_id;
+	csb->version = *vers;
+	csb->flags = flags;
+	csb->file = file;
+	get_file(file);
+
+again:
+	if (radix_tree_preload(GFP_NOIO)) {
+		__cse_destroy(csb);
+		return -ENOMEM;
+	}
+
+	spin_lock(&ctx->lock);
+	csa = radix_tree_lookup(&ctx->tree, chunk_id);
+	if (csa) {
+		void *ret;
+
+		ret = radix_tree_delete(&ctx->tree, chunk_id);
+		BUG_ON(!ret || ret != csa);
+		csa->dead = 1;
+		call_rcu(&csa->rcu, cse_destroy_rcu);
+	}
+
+	if (ctx->dead) {
+		spin_unlock(&ctx->lock);
+		radix_tree_preload_end();
+		__cse_destroy(csb);
+		return -ESTALE;
+	}
+
+	if (radix_tree_insert(&ctx->tree, chunk_id, csb)) {
+		spin_unlock(&ctx->lock);
+		radix_tree_preload_end();
+		goto again;
+	}
+	spin_unlock(&ctx->lock);
+	radix_tree_preload_end();
+
+	return 0;
+}
+
+static void pcs_csa_do_completion(struct pcs_aio_req *areq)
+{
+	struct pcs_int_request * ireq;
+
+	if (!atomic_dec_and_test(&areq->iocount))
+		return;
+
+	fput(areq->iocb.ki_filp);
+
+	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
+
+	if (!pcs_if_error(&ireq->error)) {
+		struct fuse_conn * fc = ireq->cc->fc;
+
+		fuse_stat_observe(fc, PCS_REQ_T_READ, ktime_sub(ktime_get(), ireq->ts_sent));
+		if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
+			struct fuse_trace_hdr * t;
+
+			t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, sizeof(struct fuse_tr_iotimes_hdr) +
+					       sizeof(struct fuse_tr_iotimes_cs));
+			if (t) {
+				struct fuse_tr_iotimes_hdr * th = (struct fuse_tr_iotimes_hdr *)(t + 1);
+				struct fuse_tr_iotimes_cs * ch = (struct fuse_tr_iotimes_cs *)(th + 1);
+
+				th->chunk = ireq->iochunk.chunk;
+				th->offset = ireq->iochunk.chunk + ireq->iochunk.offset;
+				th->size = ireq->iochunk.size;
+				th->start_time = ktime_to_us(ireq->ts);
+				th->local_delay = ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts));
+				th->lat = t->time - ktime_to_us(ireq->ts_sent);
+				th->ino = ireq->dentry->fileinfo.attr.id;
+				th->type = PCS_CS_READ_RESP;
+				th->cses = 1;
+
+				ch->csid = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id.val;
+				ch->misc = ktime_to_us(ireq->ts_sent);
+				ch->ts_net = 0;
+				ch->ts_io = th->lat;
+			}
+			FUSE_TRACE_COMMIT(fc->ktrace);
+		}
+	} else {
+		FUSE_KTRACE(ireq->cc->fc, "AIO error %d %lu, ireq:%p : %llu:%u+%u",
+		      ireq->error.value,
+		      ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+		      ireq, (unsigned long long)ireq->iochunk.chunk,
+		      (unsigned)ireq->iochunk.offset,
+		      (unsigned)ireq->iochunk.size);
+	}
+
+	ireq_complete(ireq);
+}
+
+static void csa_complete_work(struct work_struct *w)
+{
+	struct pcs_aio_req * areq = container_of(w, struct pcs_aio_req, work);
+
+	pcs_csa_do_completion(areq);
+}
+
+static void pcs_csa_complete(struct kiocb *iocb, long ret)
+{
+	struct pcs_aio_req * areq;
+	struct pcs_int_request * ireq;
+
+	areq = container_of(iocb, struct pcs_aio_req, iocb);
+	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
+
+	INIT_WORK(&areq->work, csa_complete_work);
+
+	if (ret != ireq->iochunk.size) {
+		if (!ireq->error.value) {
+			ireq->error.remote = 1;
+			ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+			ireq->error.value = PCS_ERR_IO;
+		}
+	}
+
+	queue_work(ireq->cc->wq, &areq->work);
+}
+
+static inline int csa_submit(struct file * file, struct pcs_int_request * ireq)
+{
+	struct pcs_aio_req * areq =  &ireq->iochunk.ar;
+	struct kiocb * iocb = &areq->iocb;
+	struct iov_iter * it = &areq->iter;
+	struct pcs_int_request *parent = ireq->completion_data.parent;
+	pcs_api_iorequest_t *ar;
+	int ret;
+
+	BUG_ON(parent->type != PCS_IREQ_API);
+	ar = parent->apireq.req;
+
+	ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, 0);
+	if (!iov_iter_is_bvec(it)) {
+		FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back");
+		return -EINVAL;
+	}
+
+	iov_iter_truncate(it, ireq->iochunk.size);
+
+	iocb->ki_pos = ireq->iochunk.offset;
+	iocb->ki_filp = get_file(file);
+	iocb->ki_complete = pcs_csa_complete;
+	iocb->ki_flags = IOCB_DIRECT;
+	iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
+
+	atomic_set(&areq->iocount, 2);
+
+	ireq->ts_sent = ktime_get();
+	ret = call_read_iter(file, iocb, it);
+
+	pcs_csa_do_completion(areq);
+
+	if (ret == -EIOCBQUEUED)
+		return 0;
+
+	if (ret >= 0) {
+		/* Completed synchronously. No good. */
+		FUSE_KTRACE(ireq->cc->fc, "SYNC AIO?");
+		iocb->ki_complete(iocb, ret, 0);
+		return 0;
+	}
+
+	/* Synchronous error. */
+	fput(areq->iocb.ki_filp);
+	FUSE_KTRACE(ireq->cc->fc, "AIO sync errno %d, falling back", ret);
+	return -ret;
+}
+
+int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
+{
+	struct pcs_csa_context * csa_ctx = rcu_dereference(cs->csa_ctx);
+
+	if (csa_ctx) {
+		struct pcs_map_entry * map = ireq->iochunk.map;
+		struct pcs_csa_entry * csa = cse_lookup(csa_ctx, map->id);
+		if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 &&
+		    (csa->flags & PCS_CSA_FL_READ)) {
+			/* XXX Paranoia? Verify! */
+			if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) {
+				if (!csa_submit(csa->file, ireq))
+					return 1;
+			}
+		}
+	}
+	return 0;
+}
+
+static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+{
+	struct pcs_csa_context *ctx = file->private_data;
+	struct file * filp = NULL;
+	int err;
+	struct pcs_csa_setmap req;
+
+	if (ctx->dead)
+		return -ESTALE;
+
+	switch (cmd) {
+	case PCS_CSA_IOC_SETMAP:
+		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+			return -EFAULT;
+		if (req.fd >= 0) {
+			filp = fget(req.fd);
+			if (filp == NULL)
+				return -EBADF;
+		}
+		err = csa_update(ctx, req.chunk_id, req.flags, &req.version, filp);
+		if (filp)
+			fput(filp);
+		return err;
+	}
+
+	return -EINVAL;
+}
+
+static __poll_t csa_poll(struct file *file, poll_table *wait)
+{
+	struct pcs_csa_context *ctx = file->private_data;
+	__poll_t events = 0;
+
+	poll_wait(file, &ctx->wqh, wait);
+
+	if (ctx->dead)
+		events |= EPOLLHUP;
+
+	return events;
+}
+
+static int csa_release(struct inode *inode, struct file *file)
+{
+	struct pcs_csa_context *ctx = file->private_data;
+	struct pcs_cs * cs;
+
+	ctx->dead = 1;
+	if ((cs = rcu_dereference(ctx->cs)) != NULL) {
+		spin_lock(&cs->lock);
+		if (ctx->cs == cs) {
+			BUG_ON(cs->csa_ctx != ctx);
+			cs->csa_ctx = NULL;
+			cs->nmaps--;
+			ctx->cs = NULL;
+			__pcs_csa_put(ctx);
+		}
+		spin_unlock(&cs->lock);
+	}
+	wake_up_poll(&ctx->wqh, EPOLLHUP);
+	pcs_csa_put(ctx);
+	module_put(THIS_MODULE);
+	return 0;
+}
+
+static const struct file_operations csa_fops = {
+	.release	= csa_release,
+	.poll		= csa_poll,
+	.unlocked_ioctl	= csa_ioctl,
+	.llseek		= noop_llseek,
+};
+
+int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id)
+{
+	int fd;
+	struct pcs_cs * cs;
+	struct pcs_csa_context * csa_ctx;
+	struct file * file;
+	PCS_NET_ADDR_T addr = { .type = PCS_ADDRTYPE_NONE };
+
+	cs = pcs_cs_find_create(&cc->css, &cs_id, &addr, CS_FL_LOCAL_SOCK|CS_FL_INACTIVE);
+	if (cs == NULL)
+		return -ENOMEM;
+
+	cs->nmaps++;
+	spin_unlock(&cs->lock);
+
+	fd = -ENOMEM;
+	csa_ctx = kzalloc(sizeof(struct pcs_csa_context), GFP_KERNEL);
+	if (csa_ctx == NULL)
+		goto out;
+
+	atomic_set(&csa_ctx->refcnt, 1);
+	csa_ctx->cs = cs;
+	INIT_RADIX_TREE(&csa_ctx->tree, GFP_ATOMIC);
+	spin_lock_init(&csa_ctx->lock);
+	init_waitqueue_head(&csa_ctx->wqh);
+
+	fd = get_unused_fd_flags(O_CLOEXEC);
+	if (fd < 0)
+		goto out;
+
+	file = anon_inode_getfile("[csaccel]", &csa_fops, csa_ctx, 0);
+	if (IS_ERR(file)) {
+		put_unused_fd(fd);
+		fd = PTR_ERR(file);
+		goto out;
+	}
+
+	spin_lock(&cs->lock);
+	if (cs->csa_ctx) {
+		spin_unlock(&cs->lock);
+		put_unused_fd(fd);
+		fd = -EBUSY;
+		goto out;
+	}
+	atomic_inc(&csa_ctx->refcnt);
+	cs->csa_ctx = csa_ctx;
+	spin_unlock(&cs->lock);
+	fd_install(fd, file);
+	__module_get(THIS_MODULE);
+	return fd;
+
+out:
+	if (csa_ctx) {
+		csa_ctx->dead = 1;
+		pcs_csa_put(csa_ctx);
+	}
+	return fd;
+}
+
+int pcs_csa_init(void)
+{
+	pcs_csa_cachep = kmem_cache_create("pcs_csa",
+					    sizeof(struct pcs_csa_entry),
+					    0, SLAB_RECLAIM_ACCOUNT|SLAB_ACCOUNT, NULL);
+	if (!pcs_csa_cachep)
+		return -ENOMEM;
+
+	return 0;
+}
+
+void pcs_csa_fini(void)
+{
+	if (pcs_csa_cachep)
+		kmem_cache_destroy(pcs_csa_cachep);
+}
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index a8d68fc35de7..29ccabd083f3 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -69,6 +69,39 @@ unsigned int rdmaio_queue_depth = 8;
 module_param(rdmaio_queue_depth, uint, 0644);
 MODULE_PARM_DESC(rdmaio_queue_depth, "RDMA queue depth");
 
+static LIST_HEAD(pcs_client_list);
+spinlock_t pcs_clients_lock;
+
+static void register_client(struct pcs_fuse_cluster * pfc)
+{
+	spin_lock(&pcs_clients_lock);
+	list_add(&pfc->list, &pcs_client_list);
+	spin_unlock(&pcs_clients_lock);
+}
+
+static void unregister_client(struct pcs_fuse_cluster * pfc)
+{
+	spin_lock(&pcs_clients_lock);
+	list_del_init(&pfc->list);
+	spin_unlock(&pcs_clients_lock);
+}
+
+static struct pcs_fuse_cluster * lookup_client(PCS_NODE_ID_T client_id, PCS_CLUSTER_ID_T * cluster_id)
+{
+	struct pcs_fuse_cluster * pfc;
+
+	spin_lock(&pcs_clients_lock);
+	list_for_each_entry(pfc, &pcs_client_list, list) {
+		if (!((pfc->cc.eng.local_id.val ^ client_id.val) & ~PCS_NODE_ALT_MASK) &&
+		    memcmp(cluster_id, &pfc->cc.eng.cluster_id, sizeof(PCS_CLUSTER_ID_T)) == 0) {
+			spin_unlock(&pcs_clients_lock);
+			return pfc;
+		}
+	}
+	spin_unlock(&pcs_clients_lock);
+	return NULL;
+}
+
 #ifdef CONFIG_DEBUG_KERNEL
 
 static int set_io_fail_percent(const char *val, const struct kernel_param *kp)
@@ -163,6 +196,7 @@ static void process_pcs_init_reply(struct fuse_mount *fm, struct fuse_args *args
 		 */
 		fc->kio.op = fc->kio.cached_op;
 		fc->kio.ctx = pfc;
+		register_client(pfc);
 		pfc = NULL;
 	}
 	spin_unlock(&fc->lock);
@@ -245,6 +279,7 @@ void kpcs_conn_fini(struct fuse_mount *fm)
 		return;
 
 	TRACE("%s fc:%p\n", __FUNCTION__, fc);
+	unregister_client(fc->kio.ctx);
 	flush_workqueue(pcs_wq);
 	pcs_cluster_fini((struct pcs_fuse_cluster *) fc->kio.ctx);
 }
@@ -1708,6 +1743,43 @@ static void kpcs_kill_requests(struct fuse_conn *fc, struct inode *inode)
 	pcs_kio_file_list(fc, kpcs_kill_lreq_itr, inode);
 }
 
+static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd, unsigned long arg, int len)
+{
+	struct fuse_conn *fc = inode ? get_fuse_conn(inode) : NULL;
+	struct pcs_fuse_cluster *pfc;
+	struct fuse_pcs_ioc_register req;
+	int res;
+
+	if (cmd != PCS_KIO_CALL_REG)
+		return -EINVAL;
+
+	if (len != sizeof(req))
+		return -EINVAL;
+
+	if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+		return -EFAULT;
+
+	if (fc) {
+		pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
+		if (memcmp(&req.cluster_id, &pfc->cc.eng.cluster_id, sizeof(PCS_CLUSTER_ID_T)))
+			return -ENXIO;
+	} else {
+		mutex_lock(&fuse_mutex);
+		pfc = lookup_client(req.client_id, &req.cluster_id);
+		if (pfc && pfc->fc)
+			fc = fuse_conn_get(pfc->fc);
+		mutex_unlock(&fuse_mutex);
+		if (!pfc)
+			return -ENXIO;
+	}
+
+	res = pcs_csa_register(&pfc->cc, req.cs_id);
+
+	if (!inode)
+		fuse_conn_put(fc);
+	return res;
+}
+
 static struct fuse_kio_ops kio_pcs_ops = {
 	.name		= "pcs",
 	.owner		= THIS_MODULE,
@@ -1723,12 +1795,17 @@ static struct fuse_kio_ops kio_pcs_ops = {
 	.file_close	= kpcs_file_close,
 	.inode_release	= kpcs_inode_release,
 	.kill_requests	= kpcs_kill_requests,
+	.ioctl		= kpcs_ioctl,
 };
 
 
 static int __init kpcs_mod_init(void)
 {
 	int err = -ENOMEM;
+
+	spin_lock_init(&pcs_clients_lock);
+	INIT_LIST_HEAD(&pcs_client_list);
+
 	pcs_fuse_req_cachep = kmem_cache_create("pcs_fuse_request",
 						sizeof(struct pcs_fuse_req),
 						0, 0, NULL);
@@ -1756,10 +1833,13 @@ static int __init kpcs_mod_init(void)
 	if (!pcs_cleanup_wq)
 		goto free_wq;
 
+	if (pcs_csa_init())
+		goto free_cleanup_wq;
+
 	fast_path_version = PCS_FAST_PATH_VERSION.full;
 
 	if (fuse_register_kio(&kio_pcs_ops))
-		goto free_cleanup_wq;
+		goto free_csa;
 
 	fuse_trace_root = debugfs_create_dir("fuse", NULL);
 
@@ -1767,6 +1847,8 @@ static int __init kpcs_mod_init(void)
 	       pcs_fuse_req_cachep, pcs_ireq_cachep, pcs_wq);
 
 	return 0;
+free_csa:
+	pcs_csa_fini();
 free_cleanup_wq:
 	destroy_workqueue(pcs_cleanup_wq);
 free_wq:
@@ -1791,6 +1873,7 @@ static void __exit kpcs_mod_exit(void)
 	kmem_cache_destroy(pcs_map_cachep);
 	kmem_cache_destroy(pcs_ireq_cachep);
 	kmem_cache_destroy(pcs_fuse_req_cachep);
+	pcs_csa_fini();
 }
 
 module_init(kpcs_mod_init);
diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
index 02418c5b6c3d..24e6b0be9743 100644
--- a/fs/fuse/kio/pcs/pcs_ioctl.h
+++ b/fs/fuse/kio/pcs/pcs_ioctl.h
@@ -87,4 +87,31 @@ struct pcs_ioc_getmap
 #define PCS_IOC_KDIRECT_RELEASE _IO('V',36)
 #define PCS_IOC_GETMAP		_IOWR('V',37, struct pcs_ioc_getmap)
 
+
+#define PCS_KIO_CALL_REG	1
+
+struct fuse_pcs_ioc_register
+{
+	PCS_NODE_ID_T 		cs_id;
+	PCS_NODE_ID_T 		client_id;
+	PCS_CLUSTER_ID_T	cluster_id;
+	PCS_INTEGRITY_SEQ_T	integrity_seq;
+	u32			reserved;
+};
+
+struct pcs_csa_setmap
+{
+	PCS_CHUNK_UID_T		chunk_id;
+	PCS_MAP_VERSION_T	version;
+	int			fd;
+	u32			flags;
+#define PCS_CSA_FL_READ		1
+#define PCS_CSA_FL_WRITE	2
+	PCS_SYNC_SEQ_T		sync_epoch;
+	PCS_SYNC_SEQ_T		sync_seq;
+	u64			reserved;
+};
+
+#define PCS_CSA_IOC_SETMAP	_IOR('V',38, struct pcs_csa_setmap)
+
 #endif /* _PCS_IOCTL_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_prot_types.h b/fs/fuse/kio/pcs/pcs_prot_types.h
index 9393d8d13421..d1ed5484f41b 100644
--- a/fs/fuse/kio/pcs/pcs_prot_types.h
+++ b/fs/fuse/kio/pcs/pcs_prot_types.h
@@ -33,6 +33,7 @@ typedef u64 PCS_FILE_ID_T;
 #define PCS_NODE_TYPE_SHIFT	10
 #define PCS_NODE_TYPE_MASK	(((1ULL << PCS_NODE_TYPE_BITS) - 1) << PCS_NODE_TYPE_SHIFT)
 #define PCS_NODE_ID_MASK	(~PCS_NODE_TYPE_MASK)
+#define PCS_NODE_ALT_MASK	(1ULL << 63)
 
 typedef struct __pre_aligned(8) _PCS_CHUNK_ID_T {
 	PCS_FILE_ID_T	fileid;
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 8166ad056e0f..8c0bd3e43079 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -46,6 +46,14 @@ enum
  * Messages can be of various "type".
  */
 
+struct pcs_aio_req
+{
+	struct kiocb		iocb;
+	atomic_t		iocount;
+	struct iov_iter 	iter;
+	struct work_struct	work;
+};
+
 struct pcs_int_request
 {
 	struct pcs_cluster_core* cc;
@@ -118,11 +126,16 @@ struct pcs_int_request
 			u64			offset;
 			struct pcs_cs_list	*csl;
 			PCS_NODE_ID_T		banned_cs;
-			struct pcs_msg		msg;
-			struct pcs_cs_iohdr	hbuf;		/* Buffer for header.
-								 * A little ugly
-								 */
-			struct kvec		hbuf_kv;
+			union {
+				struct {
+					struct pcs_msg		msg;
+					struct pcs_cs_iohdr	hbuf;		/* Buffer for header.
+										 * A little ugly
+										 */
+					struct kvec		hbuf_kv;
+				};
+				struct pcs_aio_req		ar;
+			};
 		} iochunk;
 
 		struct {
diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h
index 9924cd088bd6..ace90d721038 100644
--- a/include/uapi/linux/fuse.h
+++ b/include/uapi/linux/fuse.h
@@ -1032,4 +1032,14 @@ struct fuse_secctx_header {
 	uint32_t	nr_secctx;
 };
 
+struct fuse_kio_call
+{
+	uint32_t	cmd;
+	uint32_t	len;
+	uint64_t	data;
+	uint8_t		name[32];
+};
+
+#define FUSE_IOC_KIO_CALL	_IOW('V',39,struct fuse_kio_call)
+
 #endif /* _LINUX_FUSE_H */


More information about the Devel mailing list