[Devel] [PATCH RHEL9 COMMIT] fuse: cs acceleration for writes

Konstantin Khorenko khorenko at virtuozzo.com
Wed Nov 1 22:44:54 MSK 2023


The commit is pushed to "branch-rh9-5.14.0-284.25.1.vz9.30.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh9-5.14.0-284.25.1.vz9.30.8
------>
commit 5472b6c50d7130397c3fa0deb72579231e5506e8
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date:   Fri Oct 6 18:42:55 2023 +0800

    fuse: cs acceleration for writes
    
    If we have a chain of CSes:
    
    D1 -> D2 -> Dk -> Nk+1 ... -> Nn
    
    whered k D-CSes are accessible locally and Nk+1 .. Nn are accessible
    only through network (some of them can happen to be local).
    
    We can fanout direct writes to D1, ... and Dk and send a write to remaining tail
    Nk+1 ... -> Nn.
    
    It is for user-space CS to decide when this is legal thing to do.
    
    https://pmc.acronis.work/browse/VSTOR-54040
    
    Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
    
    Feature: vStorage
---
 fs/fuse/kio/pcs/pcs_cs.c       |  32 ++-
 fs/fuse/kio/pcs/pcs_cs.h       |   2 +
 fs/fuse/kio/pcs/pcs_cs_accel.c | 518 +++++++++++++++++++++++++++++++++++++++--
 fs/fuse/kio/pcs/pcs_map.c      |  18 +-
 fs/fuse/kio/pcs/pcs_map.h      |  16 ++
 fs/fuse/kio/pcs/pcs_req.h      |  38 +++
 6 files changed, 592 insertions(+), 32 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index c518cc9792a4..fffbce6110b5 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -291,6 +291,11 @@ void cs_log_io_times(struct pcs_int_request * ireq, struct pcs_msg * resp, unsig
 	struct pcs_cs_iohdr * h = (struct pcs_cs_iohdr *)msg_inline_head(resp);
 	int reqt = h->hdr.type != PCS_CS_SYNC_RESP ? ireq->iochunk.cmd : PCS_REQ_T_SYNC;
 
+	if (ireq->iochunk.parent_N && h->hdr.type != PCS_CS_READ_RESP && h->hdr.type != PCS_CS_FIEMAP_RESP) {
+		pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, resp->rpc->peer_id);
+		return;
+	}
+
 	fuse_stat_observe(fc, reqt, ktime_sub(ktime_get(), ireq->ts_sent));
 	if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
 		int n = 1;
@@ -611,10 +616,28 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 
 	BUG_ON(msg->rpc);
 
-	if (ireq->iochunk.cmd == PCS_REQ_T_READ && !((ireq->iochunk.size|ireq->iochunk.offset) & 511) &&
-	    !(ireq->flags & IREQ_F_NO_ACCEL)) {
-		if (pcs_csa_cs_submit(cs, ireq))
-			return;
+	ireq->ts_sent = ktime_get();
+
+	if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags & IREQ_F_NO_ACCEL)) {
+		if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
+			if (pcs_csa_cs_submit(cs, ireq))
+				return;
+		} else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
+			/* Synchronous writes in accel mode are still not supported */
+			if (!(ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) &&
+			    !ireq->dentry->no_write_delay) {
+				struct pcs_int_request * sreq;
+
+				sreq = pcs_csa_csl_write_submit(ireq);
+				if (!sreq)
+					return;
+				if (sreq != ireq) {
+					ireq = sreq;
+					cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
+					msg = &ireq->iochunk.msg;
+				}
+			}
+		}
 	}
 
 	msg->private = cs;
@@ -686,7 +709,6 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 		msg->timeout = csl->write_timeout;
 	else
 		msg->timeout = csl->read_timeout;
-	ireq->ts_sent = ktime_get();
 	ireq->wait_origin.val = 0;
 
 
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 0564ef33514d..0b17e924a534 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -214,6 +214,8 @@ u32 pcs_cs_msg_size(u32 size, u32 storage_version);
 struct pcs_msg* pcs_alloc_cs_msg(u32 type, u32 size, u32 storage_version);
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq);
+struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq);
+void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id);
 void pcs_csa_cs_detach(struct pcs_cs * cs);
 
 #endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index fc6a35fb17eb..ae8562eff822 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -146,14 +146,9 @@ void pcs_csa_cs_detach(struct pcs_cs * cs)
 	}
 }
 
-static inline struct pcs_csa_entry * cse_lookup(struct pcs_csa_context * ctx, u64 chunk_id)
+static inline struct pcs_csa_entry * __cse_lookup(struct pcs_csa_context * ctx, u64 chunk_id)
 {
-	struct pcs_csa_entry * cse;
-
-	rcu_read_lock();
-	cse= radix_tree_lookup(&ctx->tree, chunk_id);
-	rcu_read_unlock();
-	return cse;
+	return radix_tree_lookup(&ctx->tree, chunk_id);
 }
 
 static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, u32 flags, PCS_MAP_VERSION_T * vers,
@@ -372,12 +367,15 @@ static void __pcs_csa_final_completion(struct pcs_aio_req *areq)
 
 	if (!pcs_if_error(&ireq->error) && (ireq->flags & IREQ_F_CRYPT)) {
 		struct pcs_cs * cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
-		struct pcs_csa_context * ctx = rcu_dereference(cs->csa_ctx);
+		struct pcs_csa_context * ctx;
+		rcu_read_lock();
+		ctx = rcu_dereference(cs->csa_ctx);
 		if (!ctx || !ctx->tfm || decrypt_data(ireq, ctx->tfm)) {
 			ireq->error.remote = 1;
 			ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
 			ireq->error.value = PCS_ERR_IO;
 		}
+		rcu_read_unlock();
 	}
 
 	if (areq->crc) {
@@ -564,7 +562,7 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 	BUG_ON(parent->type != PCS_IREQ_API);
 	ar = parent->apireq.req;
 
-	ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, 0);
+	ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, READ);
 	if (!iov_iter_is_bvec(it)) {
 		FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back");
 		return -EINVAL;
@@ -581,7 +579,6 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 	/* One ref is ours, other is for AIO. If crc read is needed we will grab the third */
 	atomic_set(&areq->iocount, 2);
 
-	ireq->ts_sent = ktime_get();
 	ret = call_read_iter(file, iocb, it);
 
 	if (unlikely(ret != -EIOCBQUEUED)) {
@@ -637,28 +634,521 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
 {
-	struct pcs_csa_context * csa_ctx = rcu_dereference(cs->csa_ctx);
+	struct pcs_csa_context * csa_ctx;
+
+	rcu_read_lock();
+	csa_ctx = rcu_dereference(cs->csa_ctx);
 
 	if (csa_ctx) {
 		struct pcs_map_entry * map = ireq->iochunk.map;
-		struct pcs_csa_entry * csa = cse_lookup(csa_ctx, map->id);
+		struct pcs_csa_entry * csa;
+
+		csa = __cse_lookup(csa_ctx, map->id);
 		if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 &&
 		    (csa->flags & PCS_CSA_FL_READ)) {
 			/* XXX Paranoia? Verify! */
 			if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) {
+				struct file * file = get_file(csa->file);
+				struct file * cfile = csa->cfile ? get_file(csa->cfile) : NULL;
+				unsigned int flags = csa->flags;
+				int err;
+
 				if (csa_ctx->tfm)
 					ireq->flags |= IREQ_F_CRYPT;
-				if (!csa_submit(csa->file, csa->cfile, csa->flags&PCS_CSA_FL_CSUM, ireq))
+
+				rcu_read_unlock();
+				err = csa_submit(file, cfile, flags&PCS_CSA_FL_CSUM, ireq);
+				fput(file);
+				if (cfile)
+					fput(cfile);
+				if (!err)
 					return 1;
+				rcu_read_lock();
 				/* Clear state which could be rewritten by csa_submit */
 				ireq->iochunk.msg.destructor = NULL;
 				ireq->iochunk.msg.rpc = NULL;
 			}
 		}
 	}
+	rcu_read_unlock();
 	return 0;
 }
 
+/* Write engine. It is similar to read, code could be merged. Actually the situation
+ * with nsrv=1 is just exactly the same. But yet reads can be optimized a lot better
+ * and we do not want to lose this advantage.
+ *
+ * Terminology:
+ *  Original ireq - ireq which is supposed to be submitted to head of cs chain
+ *   D-request    - replicas at head of chain which have accelrated mappings and eligible
+ *                  for local aio processing
+ *                  They are presented as struct's pcs_accel_write_req which are stored
+ *                  as element of array awr[i] in struct pcs_accel_req in original ireq.iochunk.acr
+ *   N-request    - Request to be submitted to tail of cs chain following the last D-request
+ *                  It is presented as cloned original ireq with overriden completion callback,
+ *                  so that its errors and not preocessed, but copied to the original ireq
+ *                  to be processed on completion of original.
+ */
+
+static void ireq_init_acr(struct pcs_int_request * ireq)
+{
+	ireq->iochunk.parent_N = NULL;
+	atomic_set(&ireq->iochunk.acr.iocount, 1);
+	ireq->iochunk.acr.num_awr = 0;
+	ireq->iochunk.acr.num_iotimes = 0;
+}
+
+static void ireq_clear_acr(struct pcs_int_request * ireq)
+{
+	int i, n;
+
+	for (i = 0; i < ireq->iochunk.acr.num_awr; i++) {
+		struct bio_vec * bvec = ireq->iochunk.acr.awr[i].bvec_copy;
+		if (bvec) {
+			for (n = ireq->iochunk.acr.awr[i].num_copy_bvecs-1; n>=0; n--) {
+				if (bvec[n].bv_page)
+					put_page(bvec[n].bv_page);
+			}
+			kfree(bvec);
+			ireq->iochunk.acr.awr[i].bvec_copy = NULL;
+		}
+	}
+	ireq->iochunk.msg.destructor = NULL;
+	ireq->iochunk.msg.rpc = NULL;
+	ireq->iochunk.parent_N = NULL;
+	ireq->flags |= IREQ_F_NO_ACCEL;
+}
+
+void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id)
+{
+	int idx = ireq->iochunk.acr.num_awr;
+	struct pcs_cs_sync_resp * srec;
+
+	ireq->iochunk.acr.io_times[idx].cs_id = cs_id;
+	ireq->iochunk.acr.io_times[idx].sync = h->sync;
+
+	for (srec = (struct pcs_cs_sync_resp*)(h + 1), idx++;
+	     (void*)(srec + 1) <= (void*)h + h->hdr.len && idx < PCS_MAX_ACCEL_CS;
+	     srec++, idx++)
+		ireq->iochunk.acr.io_times[idx] = *srec;
+
+	ireq->iochunk.acr.num_iotimes = idx;
+}
+
+static void __complete_acr_work(struct work_struct * w)
+{
+	struct pcs_int_request * ireq = container_of(w, struct pcs_int_request, iochunk.acr.work);
+
+	if (pcs_if_error(&ireq->error)) {
+		FUSE_KTRACE(ireq->cc->fc, "IO error %d %lu, ireq:%p : %llu:%u+%u",
+		      ireq->error.value,
+		      ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+		      ireq, (unsigned long long)ireq->iochunk.chunk,
+		      (unsigned)ireq->iochunk.offset,
+		      (unsigned)ireq->iochunk.size);
+	} else {
+		struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc;
+
+		fuse_stat_observe(fc, PCS_REQ_T_WRITE, ktime_sub(ktime_get(), ireq->ts_sent));
+
+		if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
+			struct fuse_trace_hdr * t;
+			int n = ireq->iochunk.acr.num_iotimes;
+
+			t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, sizeof(struct fuse_tr_iotimes_hdr) +
+					       n*sizeof(struct fuse_tr_iotimes_cs));
+			if (t) {
+				struct fuse_tr_iotimes_hdr * th = (struct fuse_tr_iotimes_hdr *)(t + 1);
+				struct fuse_tr_iotimes_cs * ch = (struct fuse_tr_iotimes_cs *)(th + 1);
+				int i;
+
+				th->chunk = ireq->iochunk.chunk;
+				th->offset = ireq->iochunk.chunk + ireq->iochunk.offset;
+				th->size = ireq->iochunk.size;
+				th->start_time = ktime_to_us(ireq->ts);
+				th->local_delay = ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts));
+				th->lat = t->time - ktime_to_us(ireq->ts_sent);
+				th->ino = ireq->dentry->fileinfo.attr.id;
+				th->type = PCS_CS_WRITE_AL_RESP;
+				th->cses = n;
+
+				for (i = 0; i < n; i++) {
+					ch->csid = ireq->iochunk.acr.io_times[i].cs_id.val;
+					ch->misc = ireq->iochunk.acr.io_times[i].sync.misc;
+					ch->ts_net = ireq->iochunk.acr.io_times[i].sync.ts_net;
+					ch->ts_io = ireq->iochunk.acr.io_times[i].sync.ts_io;
+					ch++;
+				}
+			}
+		}
+		FUSE_TRACE_COMMIT(fc->ktrace);
+	}
+
+	ireq_clear_acr(ireq);
+	/* This will either complete or retry the whole request */
+	ireq_complete(ireq);
+}
+
+static inline void csa_complete_acr(struct pcs_int_request * ireq)
+{
+	if (atomic_dec_and_test(&ireq->iochunk.acr.iocount)) {
+		INIT_WORK(&ireq->iochunk.acr.work, __complete_acr_work);
+		queue_work(ireq->cc->wq, &ireq->iochunk.acr.work);
+	}
+}
+
+
+static void __pcs_csa_write_final_completion(struct pcs_accel_write_req *areq)
+{
+	struct pcs_int_request * ireq;
+
+	fput(areq->iocb.ki_filp);
+
+	ireq = container_of(areq - areq->index, struct pcs_int_request, iochunk.acr.awr[0]);
+
+	if (!pcs_if_error(&ireq->error)) {
+		struct pcs_cs_sync_resp * sresp = &ireq->iochunk.acr.io_times[areq->index];
+		sresp->cs_id.val = ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK;
+		sresp->sync.ts_net = 0;
+		sresp->sync.ts_io = ktime_to_us(ktime_get()) - sresp->sync.misc;
+	}
+
+	csa_complete_acr(ireq);
+}
+
+static void csa_write_complete_work(struct work_struct *w)
+{
+	struct pcs_accel_write_req * areq = container_of(w, struct pcs_accel_write_req, work);
+
+	__pcs_csa_write_final_completion(areq);
+}
+
+static void csa_write_complete(struct kiocb *iocb, long ret)
+{
+	struct pcs_accel_write_req * areq;
+	struct pcs_int_request * ireq;
+
+	areq = container_of(iocb, struct pcs_accel_write_req, iocb);
+	ireq = container_of(areq-areq->index, struct pcs_int_request, iochunk.acr.awr[0]);
+
+	if (ret != ireq->iochunk.size) {
+		if (!ireq->error.value) {
+			ireq->error.remote = 1;
+			ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+			ireq->error.value = PCS_ERR_IO;
+			ireq->flags |= IREQ_F_ACCELERROR;
+		}
+	}
+
+	if (atomic_dec_and_test(&areq->iocount)) {
+		INIT_WORK(&areq->work, csa_write_complete_work);
+		queue_work(ireq->cc->wq, &areq->work);
+	}
+}
+
+static void encrypt_page_ctr(struct crypto_sync_skcipher * tfm, struct page * dst, struct page *src,
+			     unsigned int offset, unsigned int len, u64 pos, u64 chunk_id)
+{
+	struct scatterlist sgi, sgo;
+	struct { u64 a, b; } iv;
+	SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+	skcipher_request_set_sync_tfm(req, tfm);
+	skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
+	sg_init_table(&sgi, 1);
+	sg_init_table(&sgo, 1);
+
+	iv.a = chunk_id;
+	iv.b = cpu_to_be64(pos / 16);
+	sg_set_page(&sgi, src, len, offset);
+	sg_set_page(&sgo, dst, len, offset);
+	skcipher_request_set_crypt(req, &sgi, &sgo, len, &iv);
+	crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->encrypt(req);
+}
+
+static void encrypt_page_xts(struct crypto_sync_skcipher * tfm, struct page * dst, struct page *src,
+			     unsigned int offset, unsigned int len, u64 pos, u64 chunk_id)
+{
+	struct scatterlist sgi, sgo;
+	struct { u64 a, b; } iv;
+	SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+	skcipher_request_set_sync_tfm(req, tfm);
+	skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
+	sg_init_table(&sgi, 1);
+	sg_init_table(&sgo, 1);
+
+	for ( ; len > 0; len -= 512) {
+		iv.a = pos / 512;
+		iv.b = chunk_id;
+		sg_set_page(&sgi, src, 512, offset);
+		sg_set_page(&sgo, dst, 512, offset);
+		skcipher_request_set_crypt(req, &sgi, &sgo, 512, &iv);
+		crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->encrypt(req);
+		pos += 512;
+		offset += 512;
+	}
+}
+
+static int init_crypted_data(struct pcs_int_request * ireq, int idx)
+{
+	struct pcs_int_request *parent = ireq->completion_data.parent;
+	struct pcs_fuse_req * r;
+	struct bio_vec * bvec;
+	int n, nvec;
+	u64 pos;
+	u64 chunk_id;
+	struct pcs_csa_context * csa_ctx;
+	struct crypto_sync_skcipher * tfm;
+
+	BUG_ON(parent->type != PCS_IREQ_API);
+	r = parent->apireq.req->datasource;
+
+	nvec = r->exec.io.num_bvecs;
+
+	/* XXX oops, this can sleep. tfm can be destroyed. Need refcount yet?
+	 * Seems, not. We just have to refetch tfm from cs after allocations,
+	 * failing if it is destroyed already.
+	 */
+	bvec = kmalloc(sizeof(struct bio_vec) * nvec, GFP_NOIO);
+	if (!bvec)
+		return -ENOMEM;
+
+	for (n = 0; n < nvec; n++) {
+		bvec[n] = r->exec.io.bvec[n];
+		if ((bvec[n].bv_offset|bvec[n].bv_len)&511)
+			goto out;
+		bvec[n].bv_page = alloc_page(GFP_NOIO);
+		if (!bvec[n].bv_page)
+			goto out;
+	}
+
+	rcu_read_lock();
+	csa_ctx = rcu_dereference(ireq->iochunk.csl->cs[idx].cslink.cs->csa_ctx);
+	if (!csa_ctx || ((tfm = rcu_dereference(csa_ctx->tfm)) == NULL)) {
+		rcu_read_unlock();
+		goto out;
+	}
+
+	pos = ireq->iochunk.offset;
+	chunk_id = ireq->iochunk.map->id;
+	for (n = 0; n < nvec; n++) {
+		if (tfm->base.base.__crt_alg->cra_priority == 400)
+			encrypt_page_ctr(tfm, bvec[n].bv_page, r->exec.io.bvec[n].bv_page, bvec[n].bv_offset, bvec[n].bv_len, pos, chunk_id);
+		else
+			encrypt_page_xts(tfm, bvec[n].bv_page, r->exec.io.bvec[n].bv_page, bvec[n].bv_offset, bvec[n].bv_len, pos, chunk_id);
+		pos += bvec[n].bv_len;
+	}
+	rcu_read_unlock();
+
+	ireq->iochunk.acr.awr[idx].bvec_copy = bvec;
+	ireq->iochunk.acr.awr[idx].num_copy_bvecs = n;
+	return 0;
+
+out:
+	while (--n >= 0)
+		put_page(bvec[n].bv_page);
+	kfree(bvec);
+	return -ENOMEM;
+}
+
+static inline int csa_submit_write(struct file * file, struct pcs_int_request * ireq, int idx, int do_crypt)
+{
+	struct pcs_accel_write_req * areq =  &ireq->iochunk.acr.awr[idx];
+	struct kiocb * iocb = &areq->iocb;
+	struct iov_iter iter;
+	struct iov_iter * it = &iter; /* Just to use this pointer instead of &iter */
+	unsigned int size = ireq->iochunk.size;
+	int ret;
+
+	if (do_crypt) {
+		if (init_crypted_data(ireq, idx))
+			return -EINVAL;
+		iov_iter_bvec(it, WRITE, areq->bvec_copy, areq->num_copy_bvecs, size);
+	} else {
+		struct pcs_int_request *parent = ireq->completion_data.parent;
+		pcs_api_iorequest_t *ar;
+
+		areq->bvec_copy = NULL;
+		BUG_ON(parent->type != PCS_IREQ_API);
+		ar = parent->apireq.req;
+		ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, WRITE);
+		if (!iov_iter_is_bvec(it)) {
+			FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back");
+			return -EINVAL;
+		}
+		iov_iter_truncate(it, size);
+	}
+
+	iocb->ki_pos = ireq->iochunk.offset;
+	iocb->ki_filp = get_file(file);
+	iocb->ki_complete = csa_write_complete;
+	iocb->ki_flags = IOCB_DIRECT;
+	iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
+
+	/* One ref is ours, other is for AIO. */
+	atomic_set(&areq->iocount, 2);
+	atomic_inc(&ireq->iochunk.acr.iocount);
+	areq->index = idx;
+	ireq->iochunk.acr.num_awr = idx + 1;
+
+	ireq->iochunk.acr.io_times[idx].sync.misc = ktime_to_us(ktime_get());
+
+	ret = call_write_iter(file, iocb, it);
+
+	if (unlikely(ret != -EIOCBQUEUED)) {
+		if (ret != size) {
+			if (!ireq->error.value) {
+				ireq->error.remote = 1;
+				ireq->error.offender = ireq->iochunk.csl->cs[idx].info.id;
+				ireq->error.value = PCS_ERR_IO;
+			}
+
+			/* Do not drop refs, we do not want to complete ireq. */
+			fput(areq->iocb.ki_filp);
+			FUSE_KTRACE(ireq->cc->fc, "AIO submit rejected ret=%d %lu, ireq:%p : %llu:%u+%u",
+				    ret, ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+				    ireq, (unsigned long long)ireq->iochunk.chunk,
+				    (unsigned)ireq->iochunk.offset,
+				    (unsigned)size);
+			if (atomic_dec_and_test(&ireq->iochunk.acr.iocount))
+				BUG();
+			return ret >= 0 ? -EIO : ret;
+		}
+
+		/* IO already finished. Drop AIO refcnt and proceed to crc */
+		FUSE_KTRACE(ireq->cc->fc, "No good, AIO executed synchronously, ireq:%p : %llu:%u+%u",
+			    ireq, (unsigned long long)ireq->iochunk.chunk,
+			    (unsigned)ireq->iochunk.offset,
+			    (unsigned)size);
+
+		if (atomic_dec_and_test(&areq->iocount))
+			BUG();
+	}
+
+	if (atomic_dec_and_test(&areq->iocount)) {
+		INIT_WORK(&areq->work, csa_write_complete_work);
+		queue_work(ireq->cc->wq, &areq->work);
+	}
+	return 0;
+}
+
+static int csa_cs_submit_write(struct pcs_int_request * ireq, int idx)
+{
+	struct pcs_cs * cs = ireq->iochunk.csl->cs[idx].cslink.cs;
+	struct pcs_csa_context * csa_ctx;
+
+	if (idx >= PCS_MAX_ACCEL_CS)
+		return 0;
+
+	rcu_read_lock();
+	csa_ctx = rcu_dereference(cs->csa_ctx);
+	if (csa_ctx) {
+		struct pcs_map_entry * map = ireq->iochunk.map;
+		struct pcs_csa_entry * csa = __cse_lookup(csa_ctx, map->id);
+		if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 &&
+		    (csa->flags & PCS_CSA_FL_WRITE)) {
+			/* XXX Paranoia? Verify! */
+			if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) {
+				struct file * file = get_file(csa->file);
+				int do_crypt = (csa_ctx->tfm != NULL);
+				int err;
+
+				rcu_read_unlock();
+				err = csa_submit_write(file, ireq, idx, do_crypt);
+				fput(file);
+				return !err;
+			}
+		}
+	}
+	rcu_read_unlock();
+	return 0;
+}
+
+static void complete_N_request(struct pcs_int_request * sreq)
+{
+	struct pcs_int_request * ireq = sreq->iochunk.parent_N;
+
+	if (pcs_if_error(&sreq->error)) {
+		/* Error on N-request overrides any error on a D-request. */
+		ireq->error = sreq->error;
+		ireq->flags |= IREQ_F_NO_ACCEL;
+		/* Clear ACCELERROR to deliver this error normally, through invalidating the map */
+		ireq->flags &= ~IREQ_F_ACCELERROR;
+	}
+
+	/* And free all clone resources */
+	pcs_sreq_detach(sreq);
+	if (sreq->iochunk.map)
+		pcs_map_put(sreq->iochunk.map);
+	if (sreq->iochunk.csl)
+		cslist_put(sreq->iochunk.csl);
+	if (sreq->iochunk.flow)
+		pcs_flow_put(sreq->iochunk.flow, &sreq->cc->maps.ftab);
+	ireq_destroy(sreq);
+
+	csa_complete_acr(ireq);
+}
+
+
+struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq)
+{
+	int idx;
+	struct pcs_cs_list *csl = ireq->iochunk.csl;
+
+	if (csl->nsrv > PCS_MAX_ACCEL_CS)
+		return ireq;
+
+	ireq_init_acr(ireq);
+
+	for (idx = 0; idx < csl->nsrv; idx++) {
+		if (!csa_cs_submit_write(ireq, idx))
+			break;
+	}
+
+	if (idx == 0) {
+		/* Nothing was handled. Just proceed to normal submit */
+		ireq_clear_acr(ireq);
+		return ireq;
+	} else if (idx >= csl->nsrv) {
+		/* Everything went locally. No network at all. */
+		ireq->iochunk.acr.num_iotimes = idx;
+		csa_complete_acr(ireq);
+		return NULL;
+	} else {
+		/* Harder case. We have to transmit to tail replicas */
+		struct pcs_int_request * sreq = pcs_ireq_split(ireq, 0, 1);
+		if (sreq == NULL) {
+			/* Some D replicas are submitted. So, we have to go
+			 * through error cycle.
+			 */
+			ireq->error.remote = 1;
+			ireq->error.offender = ireq->iochunk.csl->cs[idx].info.id;
+			ireq->error.value = PCS_ERR_NORES;
+			csa_complete_acr(ireq);
+			return NULL;
+		}
+
+		ireq->iochunk.acr.num_iotimes = idx;
+
+		/* ireq_split does not copy size and csl */
+		sreq->iochunk.size = ireq->iochunk.size;
+		sreq->iochunk.csl = ireq->iochunk.csl;
+		cslist_get(ireq->iochunk.csl);
+		/* Yet this sreq is not actually accounted, the accounting is made for original ireq */
+		sreq->flags |= IREQ_F_NOACCT;
+		sreq->complete_cb = complete_N_request;
+		sreq->iochunk.parent_N = ireq;
+		sreq->iochunk.cs_index = idx;
+
+		/* Our original iocount ref goes to N-request,
+		 * Proceed with sending sreq to the tail of cs chain
+		 */
+		return sreq;
+	}
+}
+
+
 static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 {
 	struct pcs_csa_context *ctx = file->private_data;
@@ -717,6 +1207,7 @@ static int csa_release(struct inode *inode, struct file *file)
 	struct pcs_cs * cs;
 
 	ctx->dead = 1;
+	rcu_read_lock();
 	if ((cs = rcu_dereference(ctx->cs)) != NULL) {
 		spin_lock(&cs->lock);
 		if (ctx->cs == cs) {
@@ -728,6 +1219,7 @@ static int csa_release(struct inode *inode, struct file *file)
 		}
 		spin_unlock(&cs->lock);
 	}
+	rcu_read_unlock();
 	wake_up_poll(&ctx->wqh, EPOLLHUP);
 	pcs_csa_put(ctx);
 	return 0;
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 9dc1c95733fd..634775b3cb9e 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -70,7 +70,7 @@ static inline unsigned int pcs_sync_timeout(struct pcs_cluster_core *cc)
 	return PCS_SYNC_TIMEOUT;
 }
 
-static void cslist_destroy(struct pcs_cs_list * csl)
+void cslist_destroy(struct pcs_cs_list * csl)
 {
 	int i;
 
@@ -97,19 +97,6 @@ static void cslist_destroy(struct pcs_cs_list * csl)
 	kfree(csl);
 }
 
-static inline void cslist_get(struct pcs_cs_list * csl)
-{
-	TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt));
-
-	atomic_inc(&csl->refcnt);
-}
-static inline void cslist_put(struct pcs_cs_list * csl)
-{
-	TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt));
-	if (atomic_dec_and_test(&csl->refcnt))
-		cslist_destroy(csl);
-}
-
 static void map_drop_cslist(struct pcs_map_entry * m)
 {
 	assert_spin_locked(&m->lock);
@@ -1579,6 +1566,9 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
 	unsigned long long match_id = 0;
 	struct pcs_cs_list * csl, ** csl_p = 0;
 
+	if (ireq->flags & IREQ_F_NOACCT)
+		return;
+
 	switch (ireq->type) {
 	case PCS_IREQ_IOCHUNK:
 		csl_p = &ireq->iochunk.csl;
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index e2b3c14a5b28..bfe0719eebe2 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -221,6 +221,22 @@ void ireq_drop_tokens(struct pcs_int_request * ireq);
 
 extern unsigned int cs_io_locality;
 
+void cslist_destroy(struct pcs_cs_list * csl);
+
+static inline void cslist_get(struct pcs_cs_list * csl)
+{
+	TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt));
+
+	atomic_inc(&csl->refcnt);
+}
+
+static inline void cslist_put(struct pcs_cs_list * csl)
+{
+	TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt));
+	if (atomic_dec_and_test(&csl->refcnt))
+		cslist_destroy(csl);
+}
+
 #define MAP_FMT	"(%p) 0x%lld s:%x" DENTRY_FMT
 #define MAP_ARGS(m) (m), (long long)(m)->index,	 (m)->state, DENTRY_ARGS(pcs_dentry_from_map((m)))
 
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 68cf2702b2ea..8ee32b33f3f0 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -59,6 +59,40 @@ struct pcs_aio_req
 	u32    			crcb[PCS_MAX_INLINE_CRC];
 };
 
+#define PCS_MAX_ACCEL_CS	3
+
+struct pcs_accel_write_req
+{
+	int			index;
+	struct kiocb		iocb;
+	atomic_t		iocount;
+	struct work_struct	work;
+
+	/* Crypto bits. This holds an encrypted copy of original data for use by aio writes */
+	struct bio_vec		*bvec_copy;
+	unsigned		num_copy_bvecs;
+};
+
+struct pcs_accel_req
+{
+	struct pcs_int_request		*parent;
+	atomic_t			iocount;
+	int				num_awr;
+	struct pcs_accel_write_req	awr[PCS_MAX_ACCEL_CS];
+	int				num_iotimes;
+	struct pcs_cs_sync_resp		io_times[PCS_MAX_ACCEL_CS];
+	struct work_struct		work;
+};
+
+struct pcs_iochunk_req {
+	struct pcs_msg		msg;
+	struct pcs_cs_iohdr	hbuf;		/* Buffer for header.
+						 * A little ugly
+						 */
+	struct kvec		hbuf_kv;
+	struct pcs_int_request	*parent_N;
+};
+
 struct pcs_int_request
 {
 	struct pcs_cluster_core* cc;
@@ -84,6 +118,7 @@ struct pcs_int_request
 #define IREQ_F_NO_ACCEL		0x1000
 #define IREQ_F_CRYPT		0x2000
 #define IREQ_F_ACCELERROR	0x4000
+#define IREQ_F_NOACCT		0x8000
 
 	atomic_t		iocount;
 
@@ -141,8 +176,11 @@ struct pcs_int_request
 										 * A little ugly
 										 */
 					struct kvec		hbuf_kv;
+					struct pcs_int_request	*parent_N;
 				};
+				struct pcs_iochunk_req		ir;
 				struct pcs_aio_req		ar;
+				struct pcs_accel_req		acr;
 			};
 		} iochunk;
 


More information about the Devel mailing list