[Devel] [PATCH RHEL9 COMMIT] fuse: implement fast access to encrypted CSes

Konstantin Khorenko khorenko at virtuozzo.com
Wed Nov 1 22:44:08 MSK 2023


The commit is pushed to "branch-rh9-5.14.0-284.25.1.vz9.30.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh9-5.14.0-284.25.1.vz9.30.8
------>
commit fb25ea858b2db2e7733b7b5298b2937e21a8d1b6
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date:   Fri Oct 6 18:42:44 2023 +0800

    fuse: implement fast access to encrypted CSes
    
    Pretty straightforward, but it took lots of time to learn
    linux crypto api, overengineered cryptic mess it is.
    
    Also, the patch includes severe and good improvement to
    submission iocount handling, now code is better readable,
    optimal and more safe. The problem was raised by crypto,
    but not specific to it, crypto uses lots of stack and
    we do not want to call it from request context ever.
    And still we do not want to schedule redundant work threads.
    
    Also, ancient bugs were found and fixed. F.e.
      umount /mnt/vstorage
      rmmod fuse_kio_pcs
    used to crash, because files opened via debugfs were
    not properly refcounted. This patch must be split to separate one,
    it is actual for all kio pcs version since beginning.
    
    https://pmc.acronis.work/browse/VSTOR-54040
    
    Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
    
    Feature: vStorage
---
 fs/fuse/kio/pcs/pcs_cluster.h      |   3 +-
 fs/fuse/kio/pcs/pcs_cs.c           |   4 +-
 fs/fuse/kio/pcs/pcs_cs_accel.c     | 285 ++++++++++++++++++++++++++++---------
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |  63 +++++++-
 fs/fuse/kio/pcs/pcs_ioctl.h        |   6 +
 fs/fuse/kio/pcs/pcs_map.c          |   7 +
 fs/fuse/kio/pcs/pcs_map.h          |   2 +
 fs/fuse/kio/pcs/pcs_req.h          |   7 +-
 8 files changed, 296 insertions(+), 81 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 797300c0ffca..266d8e5e2551 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -139,7 +139,8 @@ static inline void pcs_cc_set_abort_timeout(struct pcs_cluster_core *cc, int tim
 	cc->cfg.def.abort_timeout = cc->cfg.curr.abort_timeout = timeout;
 }
 
-int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id);
+struct crypto_sync_skcipher;
+int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id, struct crypto_sync_skcipher *);
 int pcs_csa_init(void);
 void pcs_csa_fini(void);
 
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 4af8f1697d3c..c518cc9792a4 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -609,6 +609,8 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	int storage_version = atomic_read(&ireq->cc->storage_version);
 	int aligned_msg;
 
+	BUG_ON(msg->rpc);
+
 	if (ireq->iochunk.cmd == PCS_REQ_T_READ && !((ireq->iochunk.size|ireq->iochunk.offset) & 511) &&
 	    !(ireq->flags & IREQ_F_NO_ACCEL)) {
 		if (pcs_csa_cs_submit(cs, ireq))
@@ -616,8 +618,6 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	}
 
 	msg->private = cs;
-
-	BUG_ON(msg->rpc);
 	msg->private2 = ireq;
 
 	ioh = &ireq->iochunk.hbuf;
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index 24a962f8beb9..a76f28e2ae85 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -7,6 +7,8 @@
 #include <linux/anon_inodes.h>
 #include <linux/pagemap.h>
 #include <crypto/hash.h>
+#include <crypto/skcipher.h>
+#include <linux/scatterlist.h>
 
 #include "pcs_types.h"
 #include "pcs_sock_io.h"
@@ -37,13 +39,14 @@ struct kmem_cache *pcs_csa_cachep;
 
 struct pcs_csa_context
 {
-	struct rcu_head		rcu;
+	struct rcu_work		rwork;
 	struct pcs_cs		*cs;  /* The reference accounted in cs->nmaps */
 	atomic_t		refcnt;
 	int			dead;
 	spinlock_t		lock;
 	wait_queue_head_t	wqh;
 	struct radix_tree_root  tree; /* GFP_ATOMIC */
+	struct crypto_sync_skcipher * tfm;
 };
 
 struct pcs_csa_entry
@@ -57,6 +60,8 @@ struct pcs_csa_entry
 	struct file		*cfile;
 };
 
+/* Interestingly, fput is irq-safe. So, we can close files from rcu callback*/
+
 static inline void __cse_destroy(struct pcs_csa_entry * cse)
 {
 	if (cse->file) {
@@ -100,17 +105,23 @@ static void csa_clear_tree(struct pcs_csa_context *ctx)
 	} while (nr);
 }
 
-static void csa_destroy_rcu(struct rcu_head *head)
+static void csa_destroy_rcu(struct work_struct *work)
 {
-	struct pcs_csa_context * ctx = container_of(head, struct pcs_csa_context, rcu);
+	struct pcs_csa_context * ctx = container_of(to_rcu_work(work), struct pcs_csa_context, rwork);
 	BUG_ON(!ctx->dead);
 	csa_clear_tree(ctx);
+	if (ctx->tfm)
+		crypto_free_sync_skcipher(ctx->tfm);
+	kfree(ctx);
 }
 
 static inline void pcs_csa_put(struct pcs_csa_context * ctx)
 {
-	if (atomic_dec_and_test(&ctx->refcnt))
-		call_rcu(&ctx->rcu, csa_destroy_rcu);
+	if (atomic_dec_and_test(&ctx->refcnt)) {
+		INIT_RCU_WORK(&ctx->rwork, csa_destroy_rcu);
+		if (!queue_rcu_work(pcs_cleanup_wq, &ctx->rwork))
+			BUG();
+	}
 }
 
 static inline void __pcs_csa_put(struct pcs_csa_context * ctx)
@@ -222,18 +233,17 @@ static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, u3
 
 static int verify_crc(struct pcs_int_request * ireq, u32 * crc)
 {
-	struct iov_iter it;
-	struct pcs_int_request *parent = ireq->completion_data.parent;
-	pcs_api_iorequest_t *ar = parent->apireq.req;
+	struct iov_iter * it = &ireq->iochunk.ar.iter;
+	unsigned int size = ireq->iochunk.size;
 	char crc_desc[sizeof(struct shash_desc) + 4] __aligned(__alignof__(struct shash_desc));
 	struct shash_desc *shash = (struct shash_desc *)crc_desc;
 	int i;
 
 	shash->tfm = crc_tfm;
 
-	ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, &it, 0);
+	iov_iter_revert(it, size);
 
-	for (i = 0; i < ireq->iochunk.size/4096; i++) {
+	for (i = 0; i < size/4096; i++) {
 		unsigned int left = 4096;
 		u32 ccrc;
 
@@ -244,13 +254,13 @@ static int verify_crc(struct pcs_int_request * ireq, u32 * crc)
 			int len;
 			struct page * page;
 
-			len = iov_iter_get_pages(&it, &page, left, 1, &offset);
+			len = iov_iter_get_pages(it, &page, left, 1, &offset);
 			BUG_ON(len <= 0);
 
 			crypto_shash_alg(crc_tfm)->update(shash, kmap(page) + offset, len);
 			kunmap(page);
 			put_page(page);
-			iov_iter_advance(&it, len);
+			iov_iter_advance(it, len);
 			left -= len;
 		} while (left > 0);
 
@@ -265,17 +275,114 @@ static int verify_crc(struct pcs_int_request * ireq, u32 * crc)
 	return 0;
 }
 
-static void pcs_csa_do_completion(struct pcs_aio_req *areq)
+static int check_zero(struct page * page, unsigned int offset)
 {
-	struct pcs_int_request * ireq;
+	u64 * addr = kmap(page) + offset;
+	int i;
 
-	if (!atomic_dec_and_test(&areq->iocount))
-		return;
+	for (i = 0; i < 512/8; i++) {
+		if (likely(addr[i] != 0)) {
+			kunmap(page);
+			return 0;
+		}
+	}
+	kunmap(page);
+	return 1;
+}
+
+static int decrypt_data(struct pcs_int_request * ireq,  struct crypto_sync_skcipher * tfm)
+{
+	struct iov_iter * it = &ireq->iochunk.ar.iter;
+	unsigned int size = ireq->iochunk.size;
+	struct scatterlist sg;
+	unsigned int pos;
+	struct { u64 a, b; } iv;
+	int iv_valid = 0;
+	u64 hi = ireq->iochunk.map->id;
+	/* XXX. Figure out how to distingush xts/ctr quickly and correctly */
+	int is_ctr = (tfm->base.base.__crt_alg->cra_priority == 400);
+	SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+	iov_iter_revert(it, size);
+
+	skcipher_request_set_sync_tfm(req, tfm);
+	skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
+	sg_init_table(&sg, 1);
+
+	pos = 0;
+	while (pos < size) {
+		size_t offset;
+		int len;
+		struct page * page;
+
+		len = iov_iter_get_pages(it, &page, size - pos, 1, &offset);
+		BUG_ON(len <= 0);
+		BUG_ON(len & 511);
+		iov_iter_advance(it, len);
+
+		if (is_ctr) {
+			for (; len > 0; len -= 512) {
+				if (likely(!check_zero(page, offset))) {
+					if (unlikely(!iv_valid)) {
+						iv.a = hi;
+						iv.b = cpu_to_be64((ireq->iochunk.offset + pos) / 16);
+						iv_valid = 1;
+						sg_set_page(&sg, page, 512, offset);
+						skcipher_request_set_crypt(req, &sg, &sg, 512, &iv);
+					} else {
+						sg.length += 512;
+						req->cryptlen += 512;
+					}
+				} else {
+					if (iv_valid) {
+						crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->decrypt(req);
+						iv_valid = 0;
+					}
+				}
+				pos += 512;
+				offset += 512;
+			}
+			if (iv_valid) {
+				crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->decrypt(req);
+				iv_valid = 0;
+			}
+		} else {
+			sg_set_page(&sg, page, 512, offset);
+			skcipher_request_set_crypt(req, &sg, &sg, 512, &iv);
+			for (; len > 0; len -= 512) {
+				if (likely(!check_zero(page, offset))) {
+					iv.a = (ireq->iochunk.offset + pos) / 512;
+					iv.b = hi;
+					crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->decrypt(req);
+				}
+				pos += 512;
+				offset += 512;
+				sg.offset += 512;
+			}
+		}
+		put_page(page);
+	}
+	return 0;
+}
+
+static void __pcs_csa_final_completion(struct pcs_aio_req *areq)
+{
+	struct pcs_int_request * ireq;
 
 	fput(areq->iocb.ki_filp);
 
 	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
 
+	if (!pcs_if_error(&ireq->error) && (ireq->flags & IREQ_F_CRYPT)) {
+		struct pcs_cs * cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
+		struct pcs_csa_context * ctx = rcu_dereference(cs->csa_ctx);
+		if (!ctx || !ctx->tfm || decrypt_data(ireq, ctx->tfm)) {
+			ireq->error.remote = 1;
+			ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+			ireq->error.value = PCS_ERR_IO;
+		}
+	}
+
 	if (areq->crc) {
 		if (!pcs_if_error(&ireq->error)) {
 			if (verify_crc(ireq, areq->crc)) {
@@ -314,7 +421,7 @@ static void pcs_csa_do_completion(struct pcs_aio_req *areq)
 				th->type = PCS_CS_READ_RESP;
 				th->cses = 1;
 
-				ch->csid = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id.val;
+				ch->csid = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id.val | PCS_NODE_ALT_MASK;
 				ch->misc = ktime_to_us(ireq->ts_sent);
 				ch->ts_net = 0;
 				ch->ts_io = th->lat;
@@ -328,17 +435,20 @@ static void pcs_csa_do_completion(struct pcs_aio_req *areq)
 		      ireq, (unsigned long long)ireq->iochunk.chunk,
 		      (unsigned)ireq->iochunk.offset,
 		      (unsigned)ireq->iochunk.size);
-		ireq->flags |= IREQ_F_NO_ACCEL;
+		/* Prepare ireq for restart in slow path */
+		ireq->flags |= IREQ_F_NO_ACCEL|IREQ_F_ACCELERROR;
+		ireq->flags &= ~IREQ_F_ONCE;
+		ireq->iochunk.msg.destructor = NULL;
+		ireq->iochunk.msg.rpc = NULL;
 	}
 
 	ireq_complete(ireq);
 }
 
-static void csa_complete_work(struct work_struct *w)
+static void pcs_csa_do_completion(struct pcs_aio_req *areq)
 {
-	struct pcs_aio_req * areq = container_of(w, struct pcs_aio_req, work);
-
-	pcs_csa_do_completion(areq);
+	if (atomic_dec_and_test(&areq->iocount))
+		__pcs_csa_final_completion(areq);
 }
 
 static inline int quick_crc_fetch(struct pcs_int_request * ireq, struct file * cfile)
@@ -370,7 +480,7 @@ static void csa_crc_work(struct work_struct *w)
 	ssize_t sz;
 	loff_t pos;
 
-	if (ncrc <= PCS_MAX_INLINE_CRC)
+	if (ncrc <= PCS_MAX_INLINE_CRC*4)
 		areq->crc = areq->crcb;
 	else {
 		areq->crc = kmalloc(ncrc, GFP_KERNEL);
@@ -398,9 +508,17 @@ static void csa_crc_work(struct work_struct *w)
 		goto out;
 	}
 	fput(areq->cfile);
+	areq->cfile = NULL;
 	pcs_csa_do_completion(areq);
 }
 
+static void csa_complete_work(struct work_struct *w)
+{
+	struct pcs_aio_req * areq = container_of(w, struct pcs_aio_req, work);
+
+	__pcs_csa_final_completion(areq);
+}
+
 static void pcs_csa_complete(struct kiocb *iocb, long ret)
 {
 	struct pcs_aio_req * areq;
@@ -409,8 +527,6 @@ static void pcs_csa_complete(struct kiocb *iocb, long ret)
 	areq = container_of(iocb, struct pcs_aio_req, iocb);
 	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
 
-	INIT_WORK(&areq->work, csa_complete_work);
-
 	if (ret != ireq->iochunk.size) {
 		if (!ireq->error.value) {
 			ireq->error.remote = 1;
@@ -419,7 +535,10 @@ static void pcs_csa_complete(struct kiocb *iocb, long ret)
 		}
 	}
 
-	queue_work(ireq->cc->wq, &areq->work);
+	if (atomic_dec_and_test(&areq->iocount)) {
+		INIT_WORK(&areq->work, csa_complete_work);
+		queue_work(ireq->cc->wq, &areq->work);
+	}
 }
 
 static inline int csa_submit(struct file * file, struct file *cfile, int do_csum, struct pcs_int_request * ireq)
@@ -428,9 +547,23 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 	struct kiocb * iocb = &areq->iocb;
 	struct iov_iter * it = &areq->iter;
 	struct pcs_int_request *parent = ireq->completion_data.parent;
+	unsigned int size = ireq->iochunk.size;
 	pcs_api_iorequest_t *ar;
 	int ret;
 
+	areq->cfile = NULL;
+	areq->crc = NULL;
+
+	if (do_csum) {
+		if (cfile == NULL)
+			return -EINVAL;
+
+		if ((size|ireq->iochunk.offset) & 4095)
+			return -EINVAL;
+
+		quick_crc_fetch(ireq, cfile);
+	}
+
 	BUG_ON(parent->type != PCS_IREQ_API);
 	ar = parent->apireq.req;
 
@@ -440,7 +573,7 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 		return -EINVAL;
 	}
 
-	iov_iter_truncate(it, ireq->iochunk.size);
+	iov_iter_truncate(it, size);
 
 	iocb->ki_pos = ireq->iochunk.offset;
 	iocb->ki_filp = get_file(file);
@@ -448,58 +581,61 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 	iocb->ki_flags = IOCB_DIRECT;
 	iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
 
+	/* One ref is ours, other is for AIO. If crc read is needed we will grab the third */
 	atomic_set(&areq->iocount, 2);
 
-	areq->cfile = NULL;
-	areq->crc = NULL;
-
-	if (do_csum) {
-		if (cfile == NULL)
-			return -EINVAL;
-
-		if ((ireq->iochunk.size|ireq->iochunk.offset) & 4095)
-			return -EINVAL;
-
-		if (!quick_crc_fetch(ireq, cfile)) {
-			INIT_WORK(&areq->work, csa_crc_work);
-			atomic_inc(&areq->iocount);
-			areq->cfile = cfile;
-			get_file(cfile);
-		}
-	}
-
 	ireq->ts_sent = ktime_get();
 	ret = call_read_iter(file, iocb, it);
 
-	if (do_csum) {
-		if (ret == -EIOCBQUEUED || ret == ireq->iochunk.size) {
-			if (!areq->crc) {
-				FUSE_KTRACE(ireq->cc->fc, "Not a quicky");
-				queue_work(ireq->cc->wq, &areq->work);
+	if (unlikely(ret != -EIOCBQUEUED)) {
+		if (ret != size) {
+			if (!ireq->error.value) {
+				ireq->error.remote = 1;
+				ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+				ireq->error.value = PCS_ERR_IO;
 			}
-			pcs_csa_do_completion(areq);
-			return 0;
+
+			/* Do not drop refs, we do not want to complete ireq. */
+			fput(areq->iocb.ki_filp);
+			FUSE_KTRACE(ireq->cc->fc, "AIO submit rejected ret=%d %lu, ireq:%p : %llu:%u+%u",
+				    ret, ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+				    ireq, (unsigned long long)ireq->iochunk.chunk,
+				    (unsigned)ireq->iochunk.offset,
+				    (unsigned)size);
+			return ret >= 0 ? -EIO : ret;
 		}
-		if (!areq->crc)
-			pcs_csa_do_completion(areq);
-	}
 
-	pcs_csa_do_completion(areq);
+		/* IO already finished. Drop AIO refcnt and proceed to crc */
+		FUSE_KTRACE(ireq->cc->fc, "No good, AIO executed synchronously, ireq:%p : %llu:%u+%u",
+			    ireq, (unsigned long long)ireq->iochunk.chunk,
+			    (unsigned)ireq->iochunk.offset,
+			    (unsigned)size);
 
-	if (ret == -EIOCBQUEUED)
-		return 0;
+		if (atomic_dec_and_test(&areq->iocount))
+			BUG();
+	}
 
-	if (ret >= 0) {
-		/* Completed synchronously. No good. */
-		FUSE_KTRACE(ireq->cc->fc, "SYNC AIO?");
-		iocb->ki_complete(iocb, ret, 0);
-		return 0;
+	/* Successful or queued read. Need to start crc read, if it is not ready already */
+	if (do_csum && !areq->crc) {
+		FUSE_KTRACE(ireq->cc->fc, "Not a quicky crc");
+		INIT_WORK(&areq->work, csa_crc_work);
+		/* Grab ref for crc read work */
+		atomic_inc(&areq->iocount);
+		areq->cfile = cfile;
+		get_file(cfile);
+		queue_work(ireq->cc->wq, &areq->work);
 	}
 
-	/* Synchronous error. */
-	fput(areq->iocb.ki_filp);
-	FUSE_KTRACE(ireq->cc->fc, "AIO sync errno %d, falling back", ret);
-	return -ret;
+	/* Why not pcs_csa_do_completion? Because we do not want to execute real completion
+	 * on stack of caller, crypto is a stack hog. Normally, iocount > 1 here, but if all
+	 * the IO happen to complete so quickly (or even synchronously) that we are ready already,
+	 * it will be the last ref.
+	 */
+	if (atomic_dec_and_test(&areq->iocount)) {
+		INIT_WORK(&areq->work, csa_complete_work);
+		queue_work(ireq->cc->wq, &areq->work);
+	}
+	return 0;
 }
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
@@ -513,8 +649,13 @@ int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
 		    (csa->flags & PCS_CSA_FL_READ)) {
 			/* XXX Paranoia? Verify! */
 			if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) {
+				if (csa_ctx->tfm)
+					ireq->flags |= IREQ_F_CRYPT;
 				if (!csa_submit(csa->file, csa->cfile, csa->flags&PCS_CSA_FL_CSUM, ireq))
 					return 1;
+				/* Clear state which could be rewritten by csa_submit */
+				ireq->iochunk.msg.destructor = NULL;
+				ireq->iochunk.msg.rpc = NULL;
 			}
 		}
 	}
@@ -592,18 +733,18 @@ static int csa_release(struct inode *inode, struct file *file)
 	}
 	wake_up_poll(&ctx->wqh, EPOLLHUP);
 	pcs_csa_put(ctx);
-	module_put(THIS_MODULE);
 	return 0;
 }
 
 static const struct file_operations csa_fops = {
+	.owner		= THIS_MODULE,
 	.release	= csa_release,
 	.poll		= csa_poll,
 	.unlocked_ioctl	= csa_ioctl,
 	.llseek		= noop_llseek,
 };
 
-int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id)
+int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id, struct crypto_sync_skcipher * tfm)
 {
 	int fd;
 	struct pcs_cs * cs;
@@ -648,10 +789,14 @@ int pcs_csa_register(struct pcs_cluster_core * cc, PCS_NODE_ID_T cs_id)
 		goto out;
 	}
 	atomic_inc(&csa_ctx->refcnt);
+	csa_ctx->tfm = tfm;
 	cs->csa_ctx = csa_ctx;
 	spin_unlock(&cs->lock);
 	fd_install(fd, file);
-	__module_get(THIS_MODULE);
+
+	/* Not good, but handy, people will forget this, no doubts */
+	if (!cs_io_locality)
+		cs_io_locality = 1;
 	return fd;
 
 out:
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 4bbabfd3d5fc..39a6aaf49ab7 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -29,6 +29,7 @@
 #include <linux/debugfs.h>
 #include <linux/fiemap.h>
 #include <crypto/hash.h>
+#include <crypto/skcipher.h>
 
 #include "pcs_ioctl.h"
 #include "pcs_cluster.h"
@@ -283,7 +284,9 @@ static void kpcs_conn_fini(struct fuse_mount *fm)
 
 	TRACE("%s fc:%p\n", __FUNCTION__, fc);
 	unregister_client(fc->kio.ctx);
+	synchronize_rcu();
 	flush_workqueue(pcs_wq);
+	flush_workqueue(pcs_cleanup_wq);
 	pcs_cluster_fini((struct pcs_fuse_cluster *) fc->kio.ctx);
 }
 
@@ -1284,6 +1287,8 @@ static void kpcs_req_send(struct fuse_req *req, bool bg)
 	return;
 }
 
+static struct file_operations ktrace_file_operations;
+
 static void fuse_rpc_error_metrics_clean(struct fuse_error_metrics *metrics);
 
 static void fuse_trace_free(struct fuse_ktrace *tr)
@@ -1329,7 +1334,7 @@ static struct dentry * create_buf_file_callback(const char *filename,
 						int *is_global)
 {
 	return debugfs_create_file(filename, mode, parent, buf,
-				   &relay_file_operations);
+				   &ktrace_file_operations);
 }
 
 static int remove_buf_file_callback(struct dentry *dentry)
@@ -1571,6 +1576,7 @@ static ssize_t prometheus_file_read(struct file *filp,
 }
 
 static const struct file_operations prometheus_file_operations = {
+	.owner		= THIS_MODULE,
 	.open		= prometheus_file_open,
 	.read		= prometheus_file_read,
 	.release	= prometheus_file_release,
@@ -1752,6 +1758,7 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd,
 	struct fuse_inode *fi = NULL;
 	struct pcs_dentry_info *di = NULL;
 	struct pcs_fuse_cluster *pfc;
+	struct crypto_sync_skcipher * tfm = NULL;
 	struct fuse_pcs_ioc_register req;
 	int res;
 
@@ -1788,9 +1795,6 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd,
 	if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
 		return -EFAULT;
 
-	if (req.crypto_algo)
-		return -EOPNOTSUPP;
-
 	if (fc) {
 		pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
 		if (memcmp(&req.cluster_id, &pfc->cc.eng.cluster_id, sizeof(PCS_CLUSTER_ID_T)))
@@ -1805,10 +1809,55 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd,
 			return -ENXIO;
 	}
 
-	res = pcs_csa_register(&pfc->cc, req.cs_id);
+	if (req.crypto_algo) {
+		u64 key_data[8];
+		int klen = req.crypto_algo & PCS_CSA_EMASK_KEYLEN;
+
+		res = -EINVAL;
+		if (klen > 64)
+			goto out;
+		res = -EFAULT;
+		if (copy_from_user(&key_data, (void __user *)req.key_data, klen))
+			goto out;
+		switch (req.crypto_algo & PCS_CSA_EMASK_KEYTYPE) {
+		case PCS_CSA_EMASK_XTS:
+			tfm = crypto_alloc_sync_skcipher("__xts(aes)", CRYPTO_ALG_INTERNAL, 0);
+			break;
+		case PCS_CSA_EMASK_CTR:
+			tfm = crypto_alloc_sync_skcipher("__ctr(aes)", CRYPTO_ALG_INTERNAL, 0);
+			break;
+		}
+		res = -EINVAL;
+		if (!tfm)
+			goto out;
+		if (IS_ERR(tfm)) {
+			printk("crypto_alloc_sync_skcipher: %ld\n", PTR_ERR(tfm));
+			res = PTR_ERR(tfm);
+			goto out;
+		}
+		if (tfm->base.base.__crt_alg->cra_priority != 400 &&
+		    tfm->base.base.__crt_alg->cra_priority != 401) {
+			printk("crypto drv=%s name=%s prio=%d\n", tfm->base.base.__crt_alg->cra_driver_name,
+			       tfm->base.base.__crt_alg->cra_name, tfm->base.base.__crt_alg->cra_priority);
+			res = -EINVAL;
+			goto out;
+		}
+		res = crypto_sync_skcipher_setkey(tfm, (u8*)&key_data, klen);
+		if (res < 0) {
+			printk("crypto_sync_skcipher_setkey: %d\n", res);
+			goto out;
+		}
+	}
+
+	res = pcs_csa_register(&pfc->cc, req.cs_id, tfm);
 
+out:
 	if (!inode)
 		fuse_conn_put(fc);
+
+	if (res < 0 && tfm)
+		crypto_free_sync_skcipher(tfm);
+
 	return res;
 }
 
@@ -1873,6 +1922,10 @@ static int __init kpcs_mod_init(void)
 	if (fuse_register_kio(&kio_pcs_ops))
 		goto free_csa;
 
+	/* Clone relay_file_operations to set ownership */
+	ktrace_file_operations = relay_file_operations;
+	ktrace_file_operations.owner = THIS_MODULE;
+
 	fuse_trace_root = debugfs_create_dir("fuse", NULL);
 
 	crc_tfm = crypto_alloc_shash("crc32c", 0, 0);
diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
index 5972873c9208..8e55be02c654 100644
--- a/fs/fuse/kio/pcs/pcs_ioctl.h
+++ b/fs/fuse/kio/pcs/pcs_ioctl.h
@@ -99,6 +99,12 @@ struct fuse_pcs_ioc_register
 	u64			key_data;
 };
 
+#define PCS_CSA_EMASK_KEYLEN	0xFFFF
+#define PCS_CSA_EMASK_KEYTYPE	0xF0000
+
+#define PCS_CSA_EMASK_XTS	(1<<16)
+#define PCS_CSA_EMASK_CTR	(2<<16)
+
 struct pcs_csa_setmap
 {
 	PCS_CHUNK_UID_T		chunk_id;
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 0568d7966072..9dc1c95733fd 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -1363,6 +1363,13 @@ static void map_read_error(struct pcs_int_request *ireq)
 	if (csl == NULL || csl->map == NULL || (csl->map->state & PCS_MAP_ERROR))
 		return;
 
+	if (ireq->flags & IREQ_F_ACCELERROR) {
+		pcs_clear_error(&ireq->error);
+		ireq->flags &= ~IREQ_F_ACCELERROR;
+		ireq->flags |= IREQ_F_NO_ACCEL;
+		return;
+	}
+
 	cs = rcu_dereference_protected(csl->cs[ireq->iochunk.cs_index].cslink.cs,
 				       atomic_read(&csl->refcnt) > 0);
 
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index 4bab867477f4..e2b3c14a5b28 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -219,6 +219,8 @@ void pcs_cs_truncate_maps(struct pcs_cs *cs);
 unsigned long pcs_map_shrink_scan(struct shrinker *,  struct shrink_control *sc);
 void ireq_drop_tokens(struct pcs_int_request * ireq);
 
+extern unsigned int cs_io_locality;
+
 #define MAP_FMT	"(%p) 0x%lld s:%x" DENTRY_FMT
 #define MAP_ARGS(m) (m), (long long)(m)->index,	 (m)->state, DENTRY_ARGS(pcs_dentry_from_map((m)))
 
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 7c86f5dc73d3..68cf2702b2ea 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -53,11 +53,10 @@ struct pcs_aio_req
 	struct iov_iter 	iter;
 	struct work_struct	work;
 
-#define PCS_MAX_INLINE_CRC	2
-	u32    			crcb[PCS_MAX_INLINE_CRC];
 	u32    			*crc;
 	struct file		*cfile;
-	struct work_struct	cwork;
+#define PCS_MAX_INLINE_CRC	32
+	u32    			crcb[PCS_MAX_INLINE_CRC];
 };
 
 struct pcs_int_request
@@ -83,6 +82,8 @@ struct pcs_int_request
 #define IREQ_F_WB_SUSP		0x400
 #define IREQ_F_RECV_SPLICE	0x800
 #define IREQ_F_NO_ACCEL		0x1000
+#define IREQ_F_CRYPT		0x2000
+#define IREQ_F_ACCELERROR	0x4000
 
 	atomic_t		iocount;
 


More information about the Devel mailing list