[Devel] [PATCH VZ9 03/20] fuse: direct access to local cs repositories

Alexey Kuznetsov kuznet at virtuozzo.com
Fri Oct 6 13:42:28 MSK 2023


For now, it is used only under limited curcumstances,
only for reads, only for journalless unencrypted CS. Extentions
are possible, but not before we see the result.
The value of this POC case is that it allows to get minimal physically
possible latency, extensions will result in some degradations,
which are to be evaluated separately.

The idea is the following. When CS receives read request
from local kernel client, it installs to kernel a map
for this file. Subsequent requests are not sent to CS,
but processed directly from kernel. Uniqueness of this case
is that it completely avoids not only syscalls and context
switches to user threads, but also makes exact zero copy.

Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
---
 fs/fuse/Makefile                   |   3 +-
 fs/fuse/dev.c                      |  14 ++
 fs/fuse/fuse_i.h                   |   4 +
 fs/fuse/inode.c                    |   4 +-
 fs/fuse/ioctl.c                    |  13 +
 fs/fuse/kio/pcs/pcs_cluster.c      |   1 +
 fs/fuse/kio/pcs/pcs_cluster.h      |   5 +
 fs/fuse/kio/pcs/pcs_cs.c           |   7 +
 fs/fuse/kio/pcs/pcs_cs.h           |   5 +
 fs/fuse/kio/pcs/pcs_cs_accel.c     | 499 +++++++++++++++++++++++++++++++++++++
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |  85 ++++++-
 fs/fuse/kio/pcs/pcs_ioctl.h        |  27 ++
 fs/fuse/kio/pcs/pcs_prot_types.h   |   1 +
 fs/fuse/kio/pcs/pcs_req.h          |  23 +-
 include/uapi/linux/fuse.h          |  10 +
 15 files changed, 692 insertions(+), 9 deletions(-)
 create mode 100644 fs/fuse/kio/pcs/pcs_cs_accel.c

diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index 37ef32f..afed004 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -32,6 +32,7 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
 	kio/pcs/pcs_rdma_io.o \
 	kio/pcs/pcs_rdma_rw.o \
 	kio/pcs/pcs_rdma_conn.o \
-	kio/pcs/pcs_net_addr.o
+	kio/pcs/pcs_net_addr.o \
+	kio/pcs/pcs_cs_accel.o
 
 virtiofs-y := virtio_fs.o
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 424180f..0007aed 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2428,6 +2428,20 @@ static long fuse_dev_ioctl(struct file *file, unsigned int cmd,
 			res = 0;
 		}
 		break;
+      case FUSE_IOC_KIO_CALL:
+      {
+		struct fuse_kio_call req;
+		struct fuse_kio_ops * op;
+
+		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+			return -EFAULT;
+		op = fuse_kio_get(NULL, req.name);
+		if (op == NULL)
+			return -EINVAL;
+		res = op->ioctl(NULL, NULL, req.cmd, req.data, req.len);
+		fuse_kio_put(op);
+		break;
+	}
 	default:
 		res = -ENOTTY;
 		break;
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index ce90c82..e365400 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -632,6 +632,7 @@ struct fuse_kio_ops {
 	void (*file_close)(struct file *file, struct inode *inode);
 	void (*inode_release)(struct fuse_inode *fi);
 	void (*kill_requests)(struct fuse_conn *fc, struct inode *inode);
+	int  (*ioctl)(struct file *file, struct inode *inode, unsigned int cmd, unsigned long arg, int len);
 
 };
 int fuse_register_kio(struct fuse_kio_ops *ops);
@@ -1529,4 +1530,7 @@ struct fuse_file *fuse_file_open(struct fuse_mount *fm, u64 nodeid,
 void fuse_file_release(struct inode *inode, struct fuse_file *ff,
 		       unsigned int open_flags, fl_owner_t id, bool isdir);
 
+struct fuse_kio_ops *fuse_kio_get(struct fuse_conn *fc, char *name);
+void fuse_kio_put(struct fuse_kio_ops *ops);
+
 #endif /* _FS_FUSE_I_H */
diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c
index c377700..1eb64b1 100644
--- a/fs/fuse/inode.c
+++ b/fs/fuse/inode.c
@@ -599,7 +599,7 @@ void fuse_unregister_kio(struct fuse_kio_ops *ops)
 }
 EXPORT_SYMBOL_GPL(fuse_unregister_kio);
 
-static struct fuse_kio_ops *fuse_kio_get(struct fuse_conn *fc, char *name)
+struct fuse_kio_ops *fuse_kio_get(struct fuse_conn *fc, char *name)
 {
 	struct fuse_kio_ops *ops;
 	bool once = true;
@@ -621,7 +621,7 @@ static struct fuse_kio_ops *fuse_kio_get(struct fuse_conn *fc, char *name)
 	return NULL;
 }
 
-static void fuse_kio_put(struct fuse_kio_ops *ops)
+void fuse_kio_put(struct fuse_kio_ops *ops)
 {
 	module_put(ops->owner);
 }
diff --git a/fs/fuse/ioctl.c b/fs/fuse/ioctl.c
index ae4a7ac..310cd25 100644
--- a/fs/fuse/ioctl.c
+++ b/fs/fuse/ioctl.c
@@ -343,6 +343,19 @@ long fuse_ioctl_common(struct file *file, unsigned int cmd,
 	if (fuse_is_bad(inode))
 		return -EIO;
 
+	if (cmd == FUSE_IOC_KIO_CALL) {
+		struct fuse_conn *fc = get_fuse_conn(inode);
+		struct fuse_kio_call req;
+
+		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+			return -EFAULT;
+		if (!fc->kio.op || !fc->kio.op->ioctl)
+			return -EINVAL;
+		if (strncmp(req.name, fc->kio.op->name, sizeof(req.name)))
+			return -EINVAL;
+		return fc->kio.op->ioctl(file, inode, req.cmd, req.data, req.len);
+	}
+
 	return fuse_do_ioctl(file, cmd, arg, flags);
 }
 
diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c
index 32a882e..dbca68f 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.c
+++ b/fs/fuse/kio/pcs/pcs_cluster.c
@@ -604,6 +604,7 @@ int pcs_cluster_init(struct pcs_fuse_cluster *pfc, struct workqueue_struct *wq,
 	attr.node = info->node_id;
 	attr.abort_timeout_ms = 0;
 
+	INIT_LIST_HEAD(&pfc->list);
 	pfc->fc = fc;
 
 	/* core init */
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 31a5830..797300c 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -51,6 +51,7 @@ static inline void ireq_retry_inc(struct pcs_int_request *ireq)
 }
 
 struct pcs_fuse_cluster {
+	struct list_head list;
 	struct pcs_cluster_core cc;
 	struct fuse_conn *fc;
 };
@@ -138,4 +139,8 @@ static inline void pcs_cc_set_abort_timeout(struct pcs_cluster_core *cc, int tim
 	cc->cfg.def.abort_timeout = cc->cfg.curr.abort_timeout = timeout;
 }
 
+int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id);
+int pcs_csa_init(void);
+void pcs_csa_fini(void);
+
 #endif /* _PCS_CLUSTER_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 13e2b4e..7e5070d 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -609,6 +609,11 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	int storage_version = atomic_read(&ireq->cc->storage_version);
 	int aligned_msg;
 
+	if (ireq->iochunk.cmd == PCS_REQ_T_READ && !((ireq->iochunk.size|ireq->iochunk.offset) & 511)) {
+		if (pcs_csa_cs_submit(cs, ireq))
+			return;
+	}
+
 	msg->private = cs;
 
 	BUG_ON(msg->rpc);
@@ -885,6 +890,7 @@ static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose)
 	spin_unlock(&cs->css->lock);
 
 	pcs_cs_truncate_maps(cs);
+	pcs_csa_cs_detach(cs);
 
 	BUG_ON(cs->nmaps);
 
@@ -905,6 +911,7 @@ static void pcs_cs_destroy(struct pcs_cs *cs)
 	BUG_ON(!list_empty(&cs->active_list));
 	BUG_ON(!list_empty(&cs->cong_queue));
 	BUG_ON(!cs->is_dead);
+	BUG_ON(cs->csa_ctx);
 
 	if (cs->rpc) {
 		pcs_rpc_close(cs->rpc);
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index b7fe63a..0564ef3 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -97,6 +97,8 @@ struct pcs_cs {
 	int			nmaps;
 	struct list_head	map_list;
 
+	struct pcs_csa_context	*csa_ctx;
+
 	struct {
 		struct fuse_lat_stat __percpu *iolat;
 		struct fuse_lat_stat __percpu *netlat;
@@ -211,4 +213,7 @@ static inline bool cs_is_blacklisted(struct pcs_cs *cs)
 u32 pcs_cs_msg_size(u32 size, u32 storage_version);
 struct pcs_msg* pcs_alloc_cs_msg(u32 type, u32 size, u32 storage_version);
 
+int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq);
+void pcs_csa_cs_detach(struct pcs_cs * cs);
+
 #endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
new file mode 100644
index 0000000..9444f75
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -0,0 +1,499 @@
+#include <linux/types.h>
+#include <linux/file.h>
+#include <linux/rbtree.h>
+#include <linux/highmem.h>
+#include <linux/log2.h>
+#include <linux/module.h>
+#include <linux/anon_inodes.h>
+
+#include "pcs_types.h"
+#include "pcs_sock_io.h"
+#include "pcs_rpc.h"
+#include "pcs_sock_io.h"
+#include "pcs_req.h"
+#include "pcs_map.h"
+#include "pcs_cs.h"
+#include "pcs_ioctl.h"
+#include "pcs_cluster.h"
+#include "log.h"
+#include "fuse_ktrace.h"
+
+/* CSA context can be referenced from two places:
+ *  * csaccel file struct as filp->private_data
+ *    This reference is dropped at csaccel file close
+ *  * struct cs as cs->csa_ctx
+ *    This reference is dropped at unmount
+ *
+ * CSA entries can be referenced only from radix tree at corresponding CSA.
+ * No reference counting is done, releases are done through RCU cycle.
+ *
+ * Tricky part which could be done nice. ctx->cs is protected with cs->lock and rcu.
+ * So we dereference ctx->cs, lock cs and check that it is still the same afterwards.
+ */
+
+struct kmem_cache *pcs_csa_cachep;
+
+struct pcs_csa_context
+{
+	struct rcu_head		rcu;
+	struct pcs_cs		*cs;  /* The reference accounted in cs->nmaps */
+	atomic_t		refcnt;
+	int			dead;
+	spinlock_t		lock;
+	wait_queue_head_t	wqh;
+	struct radix_tree_root  tree; /* GFP_ATOMIC */
+};
+
+struct pcs_csa_entry
+{
+	struct rcu_head		rcu;
+	PCS_CHUNK_UID_T		chunk_id;
+	PCS_MAP_VERSION_T	version;
+	unsigned int		flags;
+	int			dead;
+	struct file		*file;
+};
+
+static inline void __cse_destroy(struct pcs_csa_entry * cse)
+{
+	if (cse->file) {
+		fput(cse->file);
+		cse->file = NULL;
+	}
+	kmem_cache_free(pcs_csa_cachep, cse);
+}
+
+static void cse_destroy_rcu(struct rcu_head *head)
+{
+	struct pcs_csa_entry * cse = container_of(head, struct pcs_csa_entry, rcu);
+	__cse_destroy(cse);
+}
+
+static void csa_clear_tree(struct pcs_csa_context *ctx)
+{
+#define BATCH_SIZE 16
+	struct pcs_csa_entry *cse_buf[BATCH_SIZE];
+	int nr;
+	u64 pos = 0;
+
+	do {
+		int i;
+
+		spin_lock(&ctx->lock);
+		nr = radix_tree_gang_lookup(&ctx->tree, (void **)cse_buf, pos, BATCH_SIZE);
+
+		for (i = 0; i < nr; i++) {
+			struct pcs_csa_entry * cse = cse_buf[i];
+			pos = cse->chunk_id;
+			radix_tree_delete(&ctx->tree, cse->chunk_id);
+			call_rcu(&cse->rcu, cse_destroy_rcu);
+		}
+		spin_unlock(&ctx->lock);
+		pos++;
+	} while (nr);
+}
+
+static void csa_destroy_rcu(struct rcu_head *head)
+{
+	struct pcs_csa_context * ctx = container_of(head, struct pcs_csa_context, rcu);
+	BUG_ON(!ctx->dead);
+	csa_clear_tree(ctx);
+}
+
+static inline void pcs_csa_put(struct pcs_csa_context * ctx)
+{
+	if (atomic_dec_and_test(&ctx->refcnt))
+		call_rcu(&ctx->rcu, csa_destroy_rcu);
+}
+
+static inline void __pcs_csa_put(struct pcs_csa_context * ctx)
+{
+	if (atomic_dec_and_test(&ctx->refcnt))
+		BUG();
+}
+
+void pcs_csa_cs_detach(struct pcs_cs * cs)
+{
+	struct pcs_csa_context * csa_ctx;
+
+	assert_spin_locked(&cs->lock);
+
+	if ((csa_ctx = cs->csa_ctx) != NULL) {
+		csa_ctx->cs = NULL;
+		cs->nmaps--;
+		cs->csa_ctx = NULL;
+		csa_ctx->dead = 1;
+		wake_up_poll(&csa_ctx->wqh, EPOLLHUP);
+		pcs_csa_put(csa_ctx);
+	}
+}
+
+static inline struct pcs_csa_entry * cse_lookup(struct pcs_csa_context * ctx, u64 chunk_id)
+{
+	struct pcs_csa_entry * cse;
+
+	rcu_read_lock();
+	cse= radix_tree_lookup(&ctx->tree, chunk_id);
+	rcu_read_unlock();
+	return cse;
+}
+
+static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, u32 flags, PCS_MAP_VERSION_T * vers,
+		      struct file * file)
+{
+	struct pcs_csa_entry * csa, * csb;
+
+	if (file == NULL) {
+		spin_lock(&ctx->lock);
+		csa = radix_tree_lookup(&ctx->tree, chunk_id);
+		if (csa) {
+			void * ret;
+			ret = radix_tree_delete(&ctx->tree, chunk_id);
+			BUG_ON(!ret || ret != csa);
+			csa->dead = 1;
+			call_rcu(&csa->rcu, cse_destroy_rcu);
+		}
+		spin_unlock(&ctx->lock);
+		return 0;
+	}
+
+	csb = kmem_cache_zalloc(pcs_csa_cachep, GFP_NOIO);
+	if (!csb)
+		return -ENOMEM;
+
+	csb->chunk_id = chunk_id;
+	csb->version = *vers;
+	csb->flags = flags;
+	csb->file = file;
+	get_file(file);
+
+again:
+	if (radix_tree_preload(GFP_NOIO)) {
+		__cse_destroy(csb);
+		return -ENOMEM;
+	}
+
+	spin_lock(&ctx->lock);
+	csa = radix_tree_lookup(&ctx->tree, chunk_id);
+	if (csa) {
+		void *ret;
+
+		ret = radix_tree_delete(&ctx->tree, chunk_id);
+		BUG_ON(!ret || ret != csa);
+		csa->dead = 1;
+		call_rcu(&csa->rcu, cse_destroy_rcu);
+	}
+
+	if (ctx->dead) {
+		spin_unlock(&ctx->lock);
+		radix_tree_preload_end();
+		__cse_destroy(csb);
+		return -ESTALE;
+	}
+
+	if (radix_tree_insert(&ctx->tree, chunk_id, csb)) {
+		spin_unlock(&ctx->lock);
+		radix_tree_preload_end();
+		goto again;
+	}
+	spin_unlock(&ctx->lock);
+	radix_tree_preload_end();
+
+	return 0;
+}
+
+static void pcs_csa_do_completion(struct pcs_aio_req *areq)
+{
+	struct pcs_int_request * ireq;
+
+	if (!atomic_dec_and_test(&areq->iocount))
+		return;
+
+	fput(areq->iocb.ki_filp);
+
+	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
+
+	if (!pcs_if_error(&ireq->error)) {
+		struct fuse_conn * fc = ireq->cc->fc;
+
+		fuse_stat_observe(fc, PCS_REQ_T_READ, ktime_sub(ktime_get(), ireq->ts_sent));
+		if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
+			struct fuse_trace_hdr * t;
+
+			t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, sizeof(struct fuse_tr_iotimes_hdr) +
+					       sizeof(struct fuse_tr_iotimes_cs));
+			if (t) {
+				struct fuse_tr_iotimes_hdr * th = (struct fuse_tr_iotimes_hdr *)(t + 1);
+				struct fuse_tr_iotimes_cs * ch = (struct fuse_tr_iotimes_cs *)(th + 1);
+
+				th->chunk = ireq->iochunk.chunk;
+				th->offset = ireq->iochunk.chunk + ireq->iochunk.offset;
+				th->size = ireq->iochunk.size;
+				th->start_time = ktime_to_us(ireq->ts);
+				th->local_delay = ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts));
+				th->lat = t->time - ktime_to_us(ireq->ts_sent);
+				th->ino = ireq->dentry->fileinfo.attr.id;
+				th->type = PCS_CS_READ_RESP;
+				th->cses = 1;
+
+				ch->csid = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id.val;
+				ch->misc = ktime_to_us(ireq->ts_sent);
+				ch->ts_net = 0;
+				ch->ts_io = th->lat;
+			}
+			FUSE_TRACE_COMMIT(fc->ktrace);
+		}
+	} else {
+		FUSE_KTRACE(ireq->cc->fc, "AIO error %d %lu, ireq:%p : %llu:%u+%u",
+		      ireq->error.value,
+		      ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+		      ireq, (unsigned long long)ireq->iochunk.chunk,
+		      (unsigned)ireq->iochunk.offset,
+		      (unsigned)ireq->iochunk.size);
+	}
+
+	ireq_complete(ireq);
+}
+
+static void csa_complete_work(struct work_struct *w)
+{
+	struct pcs_aio_req * areq = container_of(w, struct pcs_aio_req, work);
+
+	pcs_csa_do_completion(areq);
+}
+
+static void pcs_csa_complete(struct kiocb *iocb, long ret)
+{
+	struct pcs_aio_req * areq;
+	struct pcs_int_request * ireq;
+
+	areq = container_of(iocb, struct pcs_aio_req, iocb);
+	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
+
+	INIT_WORK(&areq->work, csa_complete_work);
+
+	if (ret != ireq->iochunk.size) {
+		if (!ireq->error.value) {
+			ireq->error.remote = 1;
+			ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+			ireq->error.value = PCS_ERR_IO;
+		}
+	}
+
+	queue_work(ireq->cc->wq, &areq->work);
+}
+
+static inline int csa_submit(struct file * file, struct pcs_int_request * ireq)
+{
+	struct pcs_aio_req * areq =  &ireq->iochunk.ar;
+	struct kiocb * iocb = &areq->iocb;
+	struct iov_iter * it = &areq->iter;
+	struct pcs_int_request *parent = ireq->completion_data.parent;
+	pcs_api_iorequest_t *ar;
+	int ret;
+
+	BUG_ON(parent->type != PCS_IREQ_API);
+	ar = parent->apireq.req;
+
+	ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, 0);
+	if (!iov_iter_is_bvec(it)) {
+		FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back");
+		return -EINVAL;
+	}
+
+	iov_iter_truncate(it, ireq->iochunk.size);
+
+	iocb->ki_pos = ireq->iochunk.offset;
+	iocb->ki_filp = get_file(file);
+	iocb->ki_complete = pcs_csa_complete;
+	iocb->ki_flags = IOCB_DIRECT;
+	iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
+
+	atomic_set(&areq->iocount, 2);
+
+	ireq->ts_sent = ktime_get();
+	ret = call_read_iter(file, iocb, it);
+
+	pcs_csa_do_completion(areq);
+
+	if (ret == -EIOCBQUEUED)
+		return 0;
+
+	if (ret >= 0) {
+		/* Completed synchronously. No good. */
+		FUSE_KTRACE(ireq->cc->fc, "SYNC AIO?");
+		iocb->ki_complete(iocb, ret, 0);
+		return 0;
+	}
+
+	/* Synchronous error. */
+	fput(areq->iocb.ki_filp);
+	FUSE_KTRACE(ireq->cc->fc, "AIO sync errno %d, falling back", ret);
+	return -ret;
+}
+
+int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
+{
+	struct pcs_csa_context * csa_ctx = rcu_dereference(cs->csa_ctx);
+
+	if (csa_ctx) {
+		struct pcs_map_entry * map = ireq->iochunk.map;
+		struct pcs_csa_entry * csa = cse_lookup(csa_ctx, map->id);
+		if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 &&
+		    (csa->flags & PCS_CSA_FL_READ)) {
+			/* XXX Paranoia? Verify! */
+			if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) {
+				if (!csa_submit(csa->file, ireq))
+					return 1;
+			}
+		}
+	}
+	return 0;
+}
+
+static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+{
+	struct pcs_csa_context *ctx = file->private_data;
+	struct file * filp = NULL;
+	int err;
+	struct pcs_csa_setmap req;
+
+	if (ctx->dead)
+		return -ESTALE;
+
+	switch (cmd) {
+	case PCS_CSA_IOC_SETMAP:
+		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+			return -EFAULT;
+		if (req.fd >= 0) {
+			filp = fget(req.fd);
+			if (filp == NULL)
+				return -EBADF;
+		}
+		err = csa_update(ctx, req.chunk_id, req.flags, &req.version, filp);
+		if (filp)
+			fput(filp);
+		return err;
+	}
+
+	return -EINVAL;
+}
+
+static __poll_t csa_poll(struct file *file, poll_table *wait)
+{
+	struct pcs_csa_context *ctx = file->private_data;
+	__poll_t events = 0;
+
+	poll_wait(file, &ctx->wqh, wait);
+
+	if (ctx->dead)
+		events |= EPOLLHUP;
+
+	return events;
+}
+
+static int csa_release(struct inode *inode, struct file *file)
+{
+	struct pcs_csa_context *ctx = file->private_data;
+	struct pcs_cs * cs;
+
+	ctx->dead = 1;
+	if ((cs = rcu_dereference(ctx->cs)) != NULL) {
+		spin_lock(&cs->lock);
+		if (ctx->cs == cs) {
+			BUG_ON(cs->csa_ctx != ctx);
+			cs->csa_ctx = NULL;
+			cs->nmaps--;
+			ctx->cs = NULL;
+			__pcs_csa_put(ctx);
+		}
+		spin_unlock(&cs->lock);
+	}
+	wake_up_poll(&ctx->wqh, EPOLLHUP);
+	pcs_csa_put(ctx);
+	module_put(THIS_MODULE);
+	return 0;
+}
+
+static const struct file_operations csa_fops = {
+	.release	= csa_release,
+	.poll		= csa_poll,
+	.unlocked_ioctl	= csa_ioctl,
+	.llseek		= noop_llseek,
+};
+
+int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id)
+{
+	int fd;
+	struct pcs_cs * cs;
+	struct pcs_csa_context * csa_ctx;
+	struct file * file;
+	PCS_NET_ADDR_T addr = { .type = PCS_ADDRTYPE_NONE };
+
+	cs = pcs_cs_find_create(&cc->css, &cs_id, &addr, CS_FL_LOCAL_SOCK|CS_FL_INACTIVE);
+	if (cs == NULL)
+		return -ENOMEM;
+
+	cs->nmaps++;
+	spin_unlock(&cs->lock);
+
+	fd = -ENOMEM;
+	csa_ctx = kzalloc(sizeof(struct pcs_csa_context), GFP_KERNEL);
+	if (csa_ctx == NULL)
+		goto out;
+
+	atomic_set(&csa_ctx->refcnt, 1);
+	csa_ctx->cs = cs;
+	INIT_RADIX_TREE(&csa_ctx->tree, GFP_ATOMIC);
+	spin_lock_init(&csa_ctx->lock);
+	init_waitqueue_head(&csa_ctx->wqh);
+
+	fd = get_unused_fd_flags(O_CLOEXEC);
+	if (fd < 0)
+		goto out;
+
+	file = anon_inode_getfile("[csaccel]", &csa_fops, csa_ctx, 0);
+	if (IS_ERR(file)) {
+		put_unused_fd(fd);
+		fd = PTR_ERR(file);
+		goto out;
+	}
+
+	spin_lock(&cs->lock);
+	if (cs->csa_ctx) {
+		spin_unlock(&cs->lock);
+		put_unused_fd(fd);
+		fd = -EBUSY;
+		goto out;
+	}
+	atomic_inc(&csa_ctx->refcnt);
+	cs->csa_ctx = csa_ctx;
+	spin_unlock(&cs->lock);
+	fd_install(fd, file);
+	__module_get(THIS_MODULE);
+	return fd;
+
+out:
+	if (csa_ctx) {
+		csa_ctx->dead = 1;
+		pcs_csa_put(csa_ctx);
+	}
+	return fd;
+}
+
+int pcs_csa_init(void)
+{
+	pcs_csa_cachep = kmem_cache_create("pcs_csa",
+					    sizeof(struct pcs_csa_entry),
+					    0, SLAB_RECLAIM_ACCOUNT|SLAB_ACCOUNT, NULL);
+	if (!pcs_csa_cachep)
+		return -ENOMEM;
+
+	return 0;
+}
+
+void pcs_csa_fini(void)
+{
+	if (pcs_csa_cachep)
+		kmem_cache_destroy(pcs_csa_cachep);
+}
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index a8d68fc..29ccabd 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -69,6 +69,39 @@
 module_param(rdmaio_queue_depth, uint, 0644);
 MODULE_PARM_DESC(rdmaio_queue_depth, "RDMA queue depth");
 
+static LIST_HEAD(pcs_client_list);
+spinlock_t pcs_clients_lock;
+
+static void register_client(struct pcs_fuse_cluster * pfc)
+{
+	spin_lock(&pcs_clients_lock);
+	list_add(&pfc->list, &pcs_client_list);
+	spin_unlock(&pcs_clients_lock);
+}
+
+static void unregister_client(struct pcs_fuse_cluster * pfc)
+{
+	spin_lock(&pcs_clients_lock);
+	list_del_init(&pfc->list);
+	spin_unlock(&pcs_clients_lock);
+}
+
+static struct pcs_fuse_cluster * lookup_client(PCS_NODE_ID_T client_id, PCS_CLUSTER_ID_T * cluster_id)
+{
+	struct pcs_fuse_cluster * pfc;
+
+	spin_lock(&pcs_clients_lock);
+	list_for_each_entry(pfc, &pcs_client_list, list) {
+		if (!((pfc->cc.eng.local_id.val ^ client_id.val) & ~PCS_NODE_ALT_MASK) &&
+		    memcmp(cluster_id, &pfc->cc.eng.cluster_id, sizeof(PCS_CLUSTER_ID_T)) == 0) {
+			spin_unlock(&pcs_clients_lock);
+			return pfc;
+		}
+	}
+	spin_unlock(&pcs_clients_lock);
+	return NULL;
+}
+
 #ifdef CONFIG_DEBUG_KERNEL
 
 static int set_io_fail_percent(const char *val, const struct kernel_param *kp)
@@ -163,6 +196,7 @@ static void process_pcs_init_reply(struct fuse_mount *fm, struct fuse_args *args
 		 */
 		fc->kio.op = fc->kio.cached_op;
 		fc->kio.ctx = pfc;
+		register_client(pfc);
 		pfc = NULL;
 	}
 	spin_unlock(&fc->lock);
@@ -245,6 +279,7 @@ void kpcs_conn_fini(struct fuse_mount *fm)
 		return;
 
 	TRACE("%s fc:%p\n", __FUNCTION__, fc);
+	unregister_client(fc->kio.ctx);
 	flush_workqueue(pcs_wq);
 	pcs_cluster_fini((struct pcs_fuse_cluster *) fc->kio.ctx);
 }
@@ -1708,6 +1743,43 @@ static void kpcs_kill_requests(struct fuse_conn *fc, struct inode *inode)
 	pcs_kio_file_list(fc, kpcs_kill_lreq_itr, inode);
 }
 
+static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd, unsigned long arg, int len)
+{
+	struct fuse_conn *fc = inode ? get_fuse_conn(inode) : NULL;
+	struct pcs_fuse_cluster *pfc;
+	struct fuse_pcs_ioc_register req;
+	int res;
+
+	if (cmd != PCS_KIO_CALL_REG)
+		return -EINVAL;
+
+	if (len != sizeof(req))
+		return -EINVAL;
+
+	if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+		return -EFAULT;
+
+	if (fc) {
+		pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
+		if (memcmp(&req.cluster_id, &pfc->cc.eng.cluster_id, sizeof(PCS_CLUSTER_ID_T)))
+			return -ENXIO;
+	} else {
+		mutex_lock(&fuse_mutex);
+		pfc = lookup_client(req.client_id, &req.cluster_id);
+		if (pfc && pfc->fc)
+			fc = fuse_conn_get(pfc->fc);
+		mutex_unlock(&fuse_mutex);
+		if (!pfc)
+			return -ENXIO;
+	}
+
+	res = pcs_csa_register(&pfc->cc, req.cs_id);
+
+	if (!inode)
+		fuse_conn_put(fc);
+	return res;
+}
+
 static struct fuse_kio_ops kio_pcs_ops = {
 	.name		= "pcs",
 	.owner		= THIS_MODULE,
@@ -1723,12 +1795,17 @@ static void kpcs_kill_requests(struct fuse_conn *fc, struct inode *inode)
 	.file_close	= kpcs_file_close,
 	.inode_release	= kpcs_inode_release,
 	.kill_requests	= kpcs_kill_requests,
+	.ioctl		= kpcs_ioctl,
 };
 
 
 static int __init kpcs_mod_init(void)
 {
 	int err = -ENOMEM;
+
+	spin_lock_init(&pcs_clients_lock);
+	INIT_LIST_HEAD(&pcs_client_list);
+
 	pcs_fuse_req_cachep = kmem_cache_create("pcs_fuse_request",
 						sizeof(struct pcs_fuse_req),
 						0, 0, NULL);
@@ -1756,10 +1833,13 @@ static int __init kpcs_mod_init(void)
 	if (!pcs_cleanup_wq)
 		goto free_wq;
 
+	if (pcs_csa_init())
+		goto free_cleanup_wq;
+
 	fast_path_version = PCS_FAST_PATH_VERSION.full;
 
 	if (fuse_register_kio(&kio_pcs_ops))
-		goto free_cleanup_wq;
+		goto free_csa;
 
 	fuse_trace_root = debugfs_create_dir("fuse", NULL);
 
@@ -1767,6 +1847,8 @@ static int __init kpcs_mod_init(void)
 	       pcs_fuse_req_cachep, pcs_ireq_cachep, pcs_wq);
 
 	return 0;
+free_csa:
+	pcs_csa_fini();
 free_cleanup_wq:
 	destroy_workqueue(pcs_cleanup_wq);
 free_wq:
@@ -1791,6 +1873,7 @@ static void __exit kpcs_mod_exit(void)
 	kmem_cache_destroy(pcs_map_cachep);
 	kmem_cache_destroy(pcs_ireq_cachep);
 	kmem_cache_destroy(pcs_fuse_req_cachep);
+	pcs_csa_fini();
 }
 
 module_init(kpcs_mod_init);
diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
index 02418c5..24e6b0b 100644
--- a/fs/fuse/kio/pcs/pcs_ioctl.h
+++ b/fs/fuse/kio/pcs/pcs_ioctl.h
@@ -87,4 +87,31 @@ struct pcs_ioc_getmap
 #define PCS_IOC_KDIRECT_RELEASE _IO('V',36)
 #define PCS_IOC_GETMAP		_IOWR('V',37, struct pcs_ioc_getmap)
 
+
+#define PCS_KIO_CALL_REG	1
+
+struct fuse_pcs_ioc_register
+{
+	PCS_NODE_ID_T 		cs_id;
+	PCS_NODE_ID_T 		client_id;
+	PCS_CLUSTER_ID_T	cluster_id;
+	PCS_INTEGRITY_SEQ_T	integrity_seq;
+	u32			reserved;
+};
+
+struct pcs_csa_setmap
+{
+	PCS_CHUNK_UID_T		chunk_id;
+	PCS_MAP_VERSION_T	version;
+	int			fd;
+	u32			flags;
+#define PCS_CSA_FL_READ		1
+#define PCS_CSA_FL_WRITE	2
+	PCS_SYNC_SEQ_T		sync_epoch;
+	PCS_SYNC_SEQ_T		sync_seq;
+	u64			reserved;
+};
+
+#define PCS_CSA_IOC_SETMAP	_IOR('V',38, struct pcs_csa_setmap)
+
 #endif /* _PCS_IOCTL_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_prot_types.h b/fs/fuse/kio/pcs/pcs_prot_types.h
index 9393d8d..d1ed548 100644
--- a/fs/fuse/kio/pcs/pcs_prot_types.h
+++ b/fs/fuse/kio/pcs/pcs_prot_types.h
@@ -33,6 +33,7 @@
 #define PCS_NODE_TYPE_SHIFT	10
 #define PCS_NODE_TYPE_MASK	(((1ULL << PCS_NODE_TYPE_BITS) - 1) << PCS_NODE_TYPE_SHIFT)
 #define PCS_NODE_ID_MASK	(~PCS_NODE_TYPE_MASK)
+#define PCS_NODE_ALT_MASK	(1ULL << 63)
 
 typedef struct __pre_aligned(8) _PCS_CHUNK_ID_T {
 	PCS_FILE_ID_T	fileid;
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 8166ad0..8c0bd3e 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -46,6 +46,14 @@ enum
  * Messages can be of various "type".
  */
 
+struct pcs_aio_req
+{
+	struct kiocb		iocb;
+	atomic_t		iocount;
+	struct iov_iter 	iter;
+	struct work_struct	work;
+};
+
 struct pcs_int_request
 {
 	struct pcs_cluster_core* cc;
@@ -118,11 +126,16 @@ struct pcs_int_request
 			u64			offset;
 			struct pcs_cs_list	*csl;
 			PCS_NODE_ID_T		banned_cs;
-			struct pcs_msg		msg;
-			struct pcs_cs_iohdr	hbuf;		/* Buffer for header.
-								 * A little ugly
-								 */
-			struct kvec		hbuf_kv;
+			union {
+				struct {
+					struct pcs_msg		msg;
+					struct pcs_cs_iohdr	hbuf;		/* Buffer for header.
+										 * A little ugly
+										 */
+					struct kvec		hbuf_kv;
+				};
+				struct pcs_aio_req		ar;
+			};
 		} iochunk;
 
 		struct {
diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h
index 9924cd0..ace90d7 100644
--- a/include/uapi/linux/fuse.h
+++ b/include/uapi/linux/fuse.h
@@ -1032,4 +1032,14 @@ struct fuse_secctx_header {
 	uint32_t	nr_secctx;
 };
 
+struct fuse_kio_call
+{
+	uint32_t	cmd;
+	uint32_t	len;
+	uint64_t	data;
+	uint8_t		name[32];
+};
+
+#define FUSE_IOC_KIO_CALL	_IOW('V',39,struct fuse_kio_call)
+
 #endif /* _LINUX_FUSE_H */
-- 
1.8.3.1



More information about the Devel mailing list