[Devel] [PATCH VZ9 05/20] fuse: implement fast access to encrypted CSes

Alexey Kuznetsov kuznet at virtuozzo.com
Fri Oct 6 13:42:44 MSK 2023


Pretty straightforward, but it took lots of time to learn
linux crypto api, overengineered cryptic mess it is.

Also, the patch includes severe and good improvement to
submission iocount handling, now code is better readbale,
optimal and more safe. The problem was raised by crypto,
but not specific to it, crypto uses lots of stack and
we do not want to call it from request context ever.
And still we do nto want to schedule redundant work threads.

Also, ancient bugs were found and fixed. F.e.
umount /mnt/vstorage
rmmod fuse_kio_pcs
used to crash, because files opened via debugfs were
not properly refcounted. This patch must be split to separate one,
it is actual for all kio pcs version since beginning.

Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
---
 fs/fuse/kio/pcs/pcs_cluster.h      |   3 +-
 fs/fuse/kio/pcs/pcs_cs.c           |   4 +-
 fs/fuse/kio/pcs/pcs_cs_accel.c     | 285 ++++++++++++++++++++++++++++---------
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |  63 +++++++-
 fs/fuse/kio/pcs/pcs_ioctl.h        |   6 +
 fs/fuse/kio/pcs/pcs_map.c          |   7 +
 fs/fuse/kio/pcs/pcs_map.h          |   2 +
 fs/fuse/kio/pcs/pcs_req.h          |   7 +-
 8 files changed, 296 insertions(+), 81 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 797300c..266d8e5 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -139,7 +139,8 @@ static inline void pcs_cc_set_abort_timeout(struct pcs_cluster_core *cc, int tim
 	cc->cfg.def.abort_timeout = cc->cfg.curr.abort_timeout = timeout;
 }
 
-int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id);
+struct crypto_sync_skcipher;
+int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id, struct crypto_sync_skcipher *);
 int pcs_csa_init(void);
 void pcs_csa_fini(void);
 
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 4af8f16..c518cc9 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -609,6 +609,8 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	int storage_version = atomic_read(&ireq->cc->storage_version);
 	int aligned_msg;
 
+	BUG_ON(msg->rpc);
+
 	if (ireq->iochunk.cmd == PCS_REQ_T_READ && !((ireq->iochunk.size|ireq->iochunk.offset) & 511) &&
 	    !(ireq->flags & IREQ_F_NO_ACCEL)) {
 		if (pcs_csa_cs_submit(cs, ireq))
@@ -616,8 +618,6 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	}
 
 	msg->private = cs;
-
-	BUG_ON(msg->rpc);
 	msg->private2 = ireq;
 
 	ioh = &ireq->iochunk.hbuf;
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index 24a962f..a76f28e 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -7,6 +7,8 @@
 #include <linux/anon_inodes.h>
 #include <linux/pagemap.h>
 #include <crypto/hash.h>
+#include <crypto/skcipher.h>
+#include <linux/scatterlist.h>
 
 #include "pcs_types.h"
 #include "pcs_sock_io.h"
@@ -37,13 +39,14 @@
 
 struct pcs_csa_context
 {
-	struct rcu_head		rcu;
+	struct rcu_work		rwork;
 	struct pcs_cs		*cs;  /* The reference accounted in cs->nmaps */
 	atomic_t		refcnt;
 	int			dead;
 	spinlock_t		lock;
 	wait_queue_head_t	wqh;
 	struct radix_tree_root  tree; /* GFP_ATOMIC */
+	struct crypto_sync_skcipher * tfm;
 };
 
 struct pcs_csa_entry
@@ -57,6 +60,8 @@ struct pcs_csa_entry
 	struct file		*cfile;
 };
 
+/* Interestingly, fput is irq-safe. So, we can close files from rcu callback*/
+
 static inline void __cse_destroy(struct pcs_csa_entry * cse)
 {
 	if (cse->file) {
@@ -100,17 +105,23 @@ static void csa_clear_tree(struct pcs_csa_context *ctx)
 	} while (nr);
 }
 
-static void csa_destroy_rcu(struct rcu_head *head)
+static void csa_destroy_rcu(struct work_struct *work)
 {
-	struct pcs_csa_context * ctx = container_of(head, struct pcs_csa_context, rcu);
+	struct pcs_csa_context * ctx = container_of(to_rcu_work(work), struct pcs_csa_context, rwork);
 	BUG_ON(!ctx->dead);
 	csa_clear_tree(ctx);
+	if (ctx->tfm)
+		crypto_free_sync_skcipher(ctx->tfm);
+	kfree(ctx);
 }
 
 static inline void pcs_csa_put(struct pcs_csa_context * ctx)
 {
-	if (atomic_dec_and_test(&ctx->refcnt))
-		call_rcu(&ctx->rcu, csa_destroy_rcu);
+	if (atomic_dec_and_test(&ctx->refcnt)) {
+		INIT_RCU_WORK(&ctx->rwork, csa_destroy_rcu);
+		if (!queue_rcu_work(pcs_cleanup_wq, &ctx->rwork))
+			BUG();
+	}
 }
 
 static inline void __pcs_csa_put(struct pcs_csa_context * ctx)
@@ -222,18 +233,17 @@ static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, u3
 
 static int verify_crc(struct pcs_int_request * ireq, u32 * crc)
 {
-	struct iov_iter it;
-	struct pcs_int_request *parent = ireq->completion_data.parent;
-	pcs_api_iorequest_t *ar = parent->apireq.req;
+	struct iov_iter * it = &ireq->iochunk.ar.iter;
+	unsigned int size = ireq->iochunk.size;
 	char crc_desc[sizeof(struct shash_desc) + 4] __aligned(__alignof__(struct shash_desc));
 	struct shash_desc *shash = (struct shash_desc *)crc_desc;
 	int i;
 
 	shash->tfm = crc_tfm;
 
-	ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, &it, 0);
+	iov_iter_revert(it, size);
 
-	for (i = 0; i < ireq->iochunk.size/4096; i++) {
+	for (i = 0; i < size/4096; i++) {
 		unsigned int left = 4096;
 		u32 ccrc;
 
@@ -244,13 +254,13 @@ static int verify_crc(struct pcs_int_request * ireq, u32 * crc)
 			int len;
 			struct page * page;
 
-			len = iov_iter_get_pages(&it, &page, left, 1, &offset);
+			len = iov_iter_get_pages(it, &page, left, 1, &offset);
 			BUG_ON(len <= 0);
 
 			crypto_shash_alg(crc_tfm)->update(shash, kmap(page) + offset, len);
 			kunmap(page);
 			put_page(page);
-			iov_iter_advance(&it, len);
+			iov_iter_advance(it, len);
 			left -= len;
 		} while (left > 0);
 
@@ -265,17 +275,114 @@ static int verify_crc(struct pcs_int_request * ireq, u32 * crc)
 	return 0;
 }
 
-static void pcs_csa_do_completion(struct pcs_aio_req *areq)
+static int check_zero(struct page * page, unsigned int offset)
 {
-	struct pcs_int_request * ireq;
+	u64 * addr = kmap(page) + offset;
+	int i;
 
-	if (!atomic_dec_and_test(&areq->iocount))
-		return;
+	for (i = 0; i < 512/8; i++) {
+		if (likely(addr[i] != 0)) {
+			kunmap(page);
+			return 0;
+		}
+	}
+	kunmap(page);
+	return 1;
+}
+
+static int decrypt_data(struct pcs_int_request * ireq,  struct crypto_sync_skcipher * tfm)
+{
+	struct iov_iter * it = &ireq->iochunk.ar.iter;
+	unsigned int size = ireq->iochunk.size;
+	struct scatterlist sg;
+	unsigned int pos;
+	struct { u64 a, b; } iv;
+	int iv_valid = 0;
+	u64 hi = ireq->iochunk.map->id;
+	/* XXX. Figure out how to distingush xts/ctr quickly and correctly */
+	int is_ctr = (tfm->base.base.__crt_alg->cra_priority == 400);
+	SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+	iov_iter_revert(it, size);
+
+	skcipher_request_set_sync_tfm(req, tfm);
+	skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
+	sg_init_table(&sg, 1);
+
+	pos = 0;
+	while (pos < size) {
+		size_t offset;
+		int len;
+		struct page * page;
+
+		len = iov_iter_get_pages(it, &page, size - pos, 1, &offset);
+		BUG_ON(len <= 0);
+		BUG_ON(len & 511);
+		iov_iter_advance(it, len);
+
+		if (is_ctr) {
+			for (; len > 0; len -= 512) {
+				if (likely(!check_zero(page, offset))) {
+					if (unlikely(!iv_valid)) {
+						iv.a = hi;
+						iv.b = cpu_to_be64((ireq->iochunk.offset + pos) / 16);
+						iv_valid = 1;
+						sg_set_page(&sg, page, 512, offset);
+						skcipher_request_set_crypt(req, &sg, &sg, 512, &iv);
+					} else {
+						sg.length += 512;
+						req->cryptlen += 512;
+					}
+				} else {
+					if (iv_valid) {
+						crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->decrypt(req);
+						iv_valid = 0;
+					}
+				}
+				pos += 512;
+				offset += 512;
+			}
+			if (iv_valid) {
+				crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->decrypt(req);
+				iv_valid = 0;
+			}
+		} else {
+			sg_set_page(&sg, page, 512, offset);
+			skcipher_request_set_crypt(req, &sg, &sg, 512, &iv);
+			for (; len > 0; len -= 512) {
+				if (likely(!check_zero(page, offset))) {
+					iv.a = (ireq->iochunk.offset + pos) / 512;
+					iv.b = hi;
+					crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->decrypt(req);
+				}
+				pos += 512;
+				offset += 512;
+				sg.offset += 512;
+			}
+		}
+		put_page(page);
+	}
+	return 0;
+}
+
+static void __pcs_csa_final_completion(struct pcs_aio_req *areq)
+{
+	struct pcs_int_request * ireq;
 
 	fput(areq->iocb.ki_filp);
 
 	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
 
+	if (!pcs_if_error(&ireq->error) && (ireq->flags & IREQ_F_CRYPT)) {
+		struct pcs_cs * cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
+		struct pcs_csa_context * ctx = rcu_dereference(cs->csa_ctx);
+		if (!ctx || !ctx->tfm || decrypt_data(ireq, ctx->tfm)) {
+			ireq->error.remote = 1;
+			ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+			ireq->error.value = PCS_ERR_IO;
+		}
+	}
+
 	if (areq->crc) {
 		if (!pcs_if_error(&ireq->error)) {
 			if (verify_crc(ireq, areq->crc)) {
@@ -314,7 +421,7 @@ static void pcs_csa_do_completion(struct pcs_aio_req *areq)
 				th->type = PCS_CS_READ_RESP;
 				th->cses = 1;
 
-				ch->csid = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id.val;
+				ch->csid = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id.val | PCS_NODE_ALT_MASK;
 				ch->misc = ktime_to_us(ireq->ts_sent);
 				ch->ts_net = 0;
 				ch->ts_io = th->lat;
@@ -328,17 +435,20 @@ static void pcs_csa_do_completion(struct pcs_aio_req *areq)
 		      ireq, (unsigned long long)ireq->iochunk.chunk,
 		      (unsigned)ireq->iochunk.offset,
 		      (unsigned)ireq->iochunk.size);
-		ireq->flags |= IREQ_F_NO_ACCEL;
+		/* Prepare ireq for restart in slow path */
+		ireq->flags |= IREQ_F_NO_ACCEL|IREQ_F_ACCELERROR;
+		ireq->flags &= ~IREQ_F_ONCE;
+		ireq->iochunk.msg.destructor = NULL;
+		ireq->iochunk.msg.rpc = NULL;
 	}
 
 	ireq_complete(ireq);
 }
 
-static void csa_complete_work(struct work_struct *w)
+static void pcs_csa_do_completion(struct pcs_aio_req *areq)
 {
-	struct pcs_aio_req * areq = container_of(w, struct pcs_aio_req, work);
-
-	pcs_csa_do_completion(areq);
+	if (atomic_dec_and_test(&areq->iocount))
+		__pcs_csa_final_completion(areq);
 }
 
 static inline int quick_crc_fetch(struct pcs_int_request * ireq, struct file * cfile)
@@ -370,7 +480,7 @@ static void csa_crc_work(struct work_struct *w)
 	ssize_t sz;
 	loff_t pos;
 
-	if (ncrc <= PCS_MAX_INLINE_CRC)
+	if (ncrc <= PCS_MAX_INLINE_CRC*4)
 		areq->crc = areq->crcb;
 	else {
 		areq->crc = kmalloc(ncrc, GFP_KERNEL);
@@ -398,9 +508,17 @@ static void csa_crc_work(struct work_struct *w)
 		goto out;
 	}
 	fput(areq->cfile);
+	areq->cfile = NULL;
 	pcs_csa_do_completion(areq);
 }
 
+static void csa_complete_work(struct work_struct *w)
+{
+	struct pcs_aio_req * areq = container_of(w, struct pcs_aio_req, work);
+
+	__pcs_csa_final_completion(areq);
+}
+
 static void pcs_csa_complete(struct kiocb *iocb, long ret)
 {
 	struct pcs_aio_req * areq;
@@ -409,8 +527,6 @@ static void pcs_csa_complete(struct kiocb *iocb, long ret)
 	areq = container_of(iocb, struct pcs_aio_req, iocb);
 	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
 
-	INIT_WORK(&areq->work, csa_complete_work);
-
 	if (ret != ireq->iochunk.size) {
 		if (!ireq->error.value) {
 			ireq->error.remote = 1;
@@ -419,7 +535,10 @@ static void pcs_csa_complete(struct kiocb *iocb, long ret)
 		}
 	}
 
-	queue_work(ireq->cc->wq, &areq->work);
+	if (atomic_dec_and_test(&areq->iocount)) {
+		INIT_WORK(&areq->work, csa_complete_work);
+		queue_work(ireq->cc->wq, &areq->work);
+	}
 }
 
 static inline int csa_submit(struct file * file, struct file *cfile, int do_csum, struct pcs_int_request * ireq)
@@ -428,9 +547,23 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 	struct kiocb * iocb = &areq->iocb;
 	struct iov_iter * it = &areq->iter;
 	struct pcs_int_request *parent = ireq->completion_data.parent;
+	unsigned int size = ireq->iochunk.size;
 	pcs_api_iorequest_t *ar;
 	int ret;
 
+	areq->cfile = NULL;
+	areq->crc = NULL;
+
+	if (do_csum) {
+		if (cfile == NULL)
+			return -EINVAL;
+
+		if ((size|ireq->iochunk.offset) & 4095)
+			return -EINVAL;
+
+		quick_crc_fetch(ireq, cfile);
+	}
+
 	BUG_ON(parent->type != PCS_IREQ_API);
 	ar = parent->apireq.req;
 
@@ -440,7 +573,7 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 		return -EINVAL;
 	}
 
-	iov_iter_truncate(it, ireq->iochunk.size);
+	iov_iter_truncate(it, size);
 
 	iocb->ki_pos = ireq->iochunk.offset;
 	iocb->ki_filp = get_file(file);
@@ -448,58 +581,61 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 	iocb->ki_flags = IOCB_DIRECT;
 	iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
 
+	/* One ref is ours, other is for AIO. If crc read is needed we will grab the third */
 	atomic_set(&areq->iocount, 2);
 
-	areq->cfile = NULL;
-	areq->crc = NULL;
-
-	if (do_csum) {
-		if (cfile == NULL)
-			return -EINVAL;
-
-		if ((ireq->iochunk.size|ireq->iochunk.offset) & 4095)
-			return -EINVAL;
-
-		if (!quick_crc_fetch(ireq, cfile)) {
-			INIT_WORK(&areq->work, csa_crc_work);
-			atomic_inc(&areq->iocount);
-			areq->cfile = cfile;
-			get_file(cfile);
-		}
-	}
-
 	ireq->ts_sent = ktime_get();
 	ret = call_read_iter(file, iocb, it);
 
-	if (do_csum) {
-		if (ret == -EIOCBQUEUED || ret == ireq->iochunk.size) {
-			if (!areq->crc) {
-				FUSE_KTRACE(ireq->cc->fc, "Not a quicky");
-				queue_work(ireq->cc->wq, &areq->work);
+	if (unlikely(ret != -EIOCBQUEUED)) {
+		if (ret != size) {
+			if (!ireq->error.value) {
+				ireq->error.remote = 1;
+				ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+				ireq->error.value = PCS_ERR_IO;
 			}
-			pcs_csa_do_completion(areq);
-			return 0;
+
+			/* Do not drop refs, we do not want to complete ireq. */
+			fput(areq->iocb.ki_filp);
+			FUSE_KTRACE(ireq->cc->fc, "AIO submit rejected ret=%d %lu, ireq:%p : %llu:%u+%u",
+				    ret, ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+				    ireq, (unsigned long long)ireq->iochunk.chunk,
+				    (unsigned)ireq->iochunk.offset,
+				    (unsigned)size);
+			return ret >= 0 ? -EIO : ret;
 		}
-		if (!areq->crc)
-			pcs_csa_do_completion(areq);
-	}
 
-	pcs_csa_do_completion(areq);
+		/* IO already finished. Drop AIO refcnt and proceed to crc */
+		FUSE_KTRACE(ireq->cc->fc, "No good, AIO executed synchronously, ireq:%p : %llu:%u+%u",
+			    ireq, (unsigned long long)ireq->iochunk.chunk,
+			    (unsigned)ireq->iochunk.offset,
+			    (unsigned)size);
 
-	if (ret == -EIOCBQUEUED)
-		return 0;
+		if (atomic_dec_and_test(&areq->iocount))
+			BUG();
+	}
 
-	if (ret >= 0) {
-		/* Completed synchronously. No good. */
-		FUSE_KTRACE(ireq->cc->fc, "SYNC AIO?");
-		iocb->ki_complete(iocb, ret, 0);
-		return 0;
+	/* Successful or queued read. Need to start crc read, if it is not ready already */
+	if (do_csum && !areq->crc) {
+		FUSE_KTRACE(ireq->cc->fc, "Not a quicky crc");
+		INIT_WORK(&areq->work, csa_crc_work);
+		/* Grab ref for crc read work */
+		atomic_inc(&areq->iocount);
+		areq->cfile = cfile;
+		get_file(cfile);
+		queue_work(ireq->cc->wq, &areq->work);
 	}
 
-	/* Synchronous error. */
-	fput(areq->iocb.ki_filp);
-	FUSE_KTRACE(ireq->cc->fc, "AIO sync errno %d, falling back", ret);
-	return -ret;
+	/* Why not pcs_csa_do_completion? Because we do not want to execute real completion
+	 * on stack of caller, crypto is a stack hog. Normally, iocount > 1 here, but if all
+	 * the IO happen to complete so quickly (or even synchronously) that we are ready already,
+	 * it will be the last ref.
+	 */
+	if (atomic_dec_and_test(&areq->iocount)) {
+		INIT_WORK(&areq->work, csa_complete_work);
+		queue_work(ireq->cc->wq, &areq->work);
+	}
+	return 0;
 }
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
@@ -513,8 +649,13 @@ int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
 		    (csa->flags & PCS_CSA_FL_READ)) {
 			/* XXX Paranoia? Verify! */
 			if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) {
+				if (csa_ctx->tfm)
+					ireq->flags |= IREQ_F_CRYPT;
 				if (!csa_submit(csa->file, csa->cfile, csa->flags&PCS_CSA_FL_CSUM, ireq))
 					return 1;
+				/* Clear state which could be rewritten by csa_submit */
+				ireq->iochunk.msg.destructor = NULL;
+				ireq->iochunk.msg.rpc = NULL;
 			}
 		}
 	}
@@ -592,18 +733,18 @@ static int csa_release(struct inode *inode, struct file *file)
 	}
 	wake_up_poll(&ctx->wqh, EPOLLHUP);
 	pcs_csa_put(ctx);
-	module_put(THIS_MODULE);
 	return 0;
 }
 
 static const struct file_operations csa_fops = {
+	.owner		= THIS_MODULE,
 	.release	= csa_release,
 	.poll		= csa_poll,
 	.unlocked_ioctl	= csa_ioctl,
 	.llseek		= noop_llseek,
 };
 
-int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id)
+int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id, struct crypto_sync_skcipher * tfm)
 {
 	int fd;
 	struct pcs_cs * cs;
@@ -648,10 +789,14 @@ int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id)
 		goto out;
 	}
 	atomic_inc(&csa_ctx->refcnt);
+	csa_ctx->tfm = tfm;
 	cs->csa_ctx = csa_ctx;
 	spin_unlock(&cs->lock);
 	fd_install(fd, file);
-	__module_get(THIS_MODULE);
+
+	/* Not good, but handy, people will forget this, no doubts */
+	if (!cs_io_locality)
+		cs_io_locality = 1;
 	return fd;
 
 out:
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 4bbabfd..39a6aaf 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -29,6 +29,7 @@
 #include <linux/debugfs.h>
 #include <linux/fiemap.h>
 #include <crypto/hash.h>
+#include <crypto/skcipher.h>
 
 #include "pcs_ioctl.h"
 #include "pcs_cluster.h"
@@ -283,7 +284,9 @@ static void kpcs_conn_fini(struct fuse_mount *fm)
 
 	TRACE("%s fc:%p\n", __FUNCTION__, fc);
 	unregister_client(fc->kio.ctx);
+	synchronize_rcu();
 	flush_workqueue(pcs_wq);
+	flush_workqueue(pcs_cleanup_wq);
 	pcs_cluster_fini((struct pcs_fuse_cluster *) fc->kio.ctx);
 }
 
@@ -1284,6 +1287,8 @@ static void kpcs_req_send(struct fuse_req *req, bool bg)
 	return;
 }
 
+static struct file_operations ktrace_file_operations;
+
 static void fuse_rpc_error_metrics_clean(struct fuse_error_metrics *metrics);
 
 static void fuse_trace_free(struct fuse_ktrace *tr)
@@ -1329,7 +1334,7 @@ static struct dentry * create_buf_file_callback(const char *filename,
 						int *is_global)
 {
 	return debugfs_create_file(filename, mode, parent, buf,
-				   &relay_file_operations);
+				   &ktrace_file_operations);
 }
 
 static int remove_buf_file_callback(struct dentry *dentry)
@@ -1571,6 +1576,7 @@ static ssize_t prometheus_file_read(struct file *filp,
 }
 
 static const struct file_operations prometheus_file_operations = {
+	.owner		= THIS_MODULE,
 	.open		= prometheus_file_open,
 	.read		= prometheus_file_read,
 	.release	= prometheus_file_release,
@@ -1752,6 +1758,7 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd,
 	struct fuse_inode *fi = NULL;
 	struct pcs_dentry_info *di = NULL;
 	struct pcs_fuse_cluster *pfc;
+	struct crypto_sync_skcipher * tfm = NULL;
 	struct fuse_pcs_ioc_register req;
 	int res;
 
@@ -1788,9 +1795,6 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd,
 	if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
 		return -EFAULT;
 
-	if (req.crypto_algo)
-		return -EOPNOTSUPP;
-
 	if (fc) {
 		pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
 		if (memcmp(&req.cluster_id, &pfc->cc.eng.cluster_id, sizeof(PCS_CLUSTER_ID_T)))
@@ -1805,10 +1809,55 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd,
 			return -ENXIO;
 	}
 
-	res = pcs_csa_register(&pfc->cc, req.cs_id);
+	if (req.crypto_algo) {
+		u64 key_data[8];
+		int klen = req.crypto_algo & PCS_CSA_EMASK_KEYLEN;
+
+		res = -EINVAL;
+		if (klen > 64)
+			goto out;
+		res = -EFAULT;
+		if (copy_from_user(&key_data, (void __user *)req.key_data, klen))
+			goto out;
+		switch (req.crypto_algo & PCS_CSA_EMASK_KEYTYPE) {
+		case PCS_CSA_EMASK_XTS:
+			tfm = crypto_alloc_sync_skcipher("__xts(aes)", CRYPTO_ALG_INTERNAL, 0);
+			break;
+		case PCS_CSA_EMASK_CTR:
+			tfm = crypto_alloc_sync_skcipher("__ctr(aes)", CRYPTO_ALG_INTERNAL, 0);
+			break;
+		}
+		res = -EINVAL;
+		if (!tfm)
+			goto out;
+		if (IS_ERR(tfm)) {
+			printk("crypto_alloc_sync_skcipher: %ld\n", PTR_ERR(tfm));
+			res = PTR_ERR(tfm);
+			goto out;
+		}
+		if (tfm->base.base.__crt_alg->cra_priority != 400 &&
+		    tfm->base.base.__crt_alg->cra_priority != 401) {
+			printk("crypto drv=%s name=%s prio=%d\n", tfm->base.base.__crt_alg->cra_driver_name,
+			       tfm->base.base.__crt_alg->cra_name, tfm->base.base.__crt_alg->cra_priority);
+			res = -EINVAL;
+			goto out;
+		}
+		res = crypto_sync_skcipher_setkey(tfm, (u8*)&key_data, klen);
+		if (res < 0) {
+			printk("crypto_sync_skcipher_setkey: %d\n", res);
+			goto out;
+		}
+	}
+
+	res = pcs_csa_register(&pfc->cc, req.cs_id, tfm);
 
+out:
 	if (!inode)
 		fuse_conn_put(fc);
+
+	if (res < 0 && tfm)
+		crypto_free_sync_skcipher(tfm);
+
 	return res;
 }
 
@@ -1873,6 +1922,10 @@ static int __init kpcs_mod_init(void)
 	if (fuse_register_kio(&kio_pcs_ops))
 		goto free_csa;
 
+	/* Clone relay_file_operations to set ownership */
+	ktrace_file_operations = relay_file_operations;
+	ktrace_file_operations.owner = THIS_MODULE;
+
 	fuse_trace_root = debugfs_create_dir("fuse", NULL);
 
 	crc_tfm = crypto_alloc_shash("crc32c", 0, 0);
diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
index 5972873..8e55be0 100644
--- a/fs/fuse/kio/pcs/pcs_ioctl.h
+++ b/fs/fuse/kio/pcs/pcs_ioctl.h
@@ -99,6 +99,12 @@ struct fuse_pcs_ioc_register
 	u64			key_data;
 };
 
+#define PCS_CSA_EMASK_KEYLEN	0xFFFF
+#define PCS_CSA_EMASK_KEYTYPE	0xF0000
+
+#define PCS_CSA_EMASK_XTS	(1<<16)
+#define PCS_CSA_EMASK_CTR	(2<<16)
+
 struct pcs_csa_setmap
 {
 	PCS_CHUNK_UID_T		chunk_id;
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 0568d79..9dc1c95 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -1363,6 +1363,13 @@ static void map_read_error(struct pcs_int_request *ireq)
 	if (csl == NULL || csl->map == NULL || (csl->map->state & PCS_MAP_ERROR))
 		return;
 
+	if (ireq->flags & IREQ_F_ACCELERROR) {
+		pcs_clear_error(&ireq->error);
+		ireq->flags &= ~IREQ_F_ACCELERROR;
+		ireq->flags |= IREQ_F_NO_ACCEL;
+		return;
+	}
+
 	cs = rcu_dereference_protected(csl->cs[ireq->iochunk.cs_index].cslink.cs,
 				       atomic_read(&csl->refcnt) > 0);
 
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index 4bab867..e2b3c14 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -219,6 +219,8 @@ static inline struct pcs_cluster_core *cc_from_map(struct pcs_map_entry * m)
 unsigned long pcs_map_shrink_scan(struct shrinker *,  struct shrink_control *sc);
 void ireq_drop_tokens(struct pcs_int_request * ireq);
 
+extern unsigned int cs_io_locality;
+
 #define MAP_FMT	"(%p) 0x%lld s:%x" DENTRY_FMT
 #define MAP_ARGS(m) (m), (long long)(m)->index,	 (m)->state, DENTRY_ARGS(pcs_dentry_from_map((m)))
 
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 7c86f5d..68cf270 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -53,11 +53,10 @@ struct pcs_aio_req
 	struct iov_iter 	iter;
 	struct work_struct	work;
 
-#define PCS_MAX_INLINE_CRC	2
-	u32    			crcb[PCS_MAX_INLINE_CRC];
 	u32    			*crc;
 	struct file		*cfile;
-	struct work_struct	cwork;
+#define PCS_MAX_INLINE_CRC	32
+	u32    			crcb[PCS_MAX_INLINE_CRC];
 };
 
 struct pcs_int_request
@@ -83,6 +82,8 @@ struct pcs_int_request
 #define IREQ_F_WB_SUSP		0x400
 #define IREQ_F_RECV_SPLICE	0x800
 #define IREQ_F_NO_ACCEL		0x1000
+#define IREQ_F_CRYPT		0x2000
+#define IREQ_F_ACCELERROR	0x4000
 
 	atomic_t		iocount;
 
-- 
1.8.3.1



More information about the Devel mailing list