[Devel] [PATCH RHEL9 COMMIT] fuse: implement fanout writes

Konstantin Khorenko khorenko at virtuozzo.com
Wed Nov 1 22:45:16 MSK 2023


The commit is pushed to "branch-rh9-5.14.0-284.25.1.vz9.30.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh9-5.14.0-284.25.1.vz9.30.8
------>
commit db885ba37bd6cbc00dca9f656fbdd64801855996
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date:   Fri Oct 6 18:43:01 2023 +0800

    fuse: implement fanout writes
    
    Module parameter cs_enable_fanout=1 is required.
    
    In case when this is legitimate writes are done not to chain of CSes,
    but to each of CSes point to point. Especially nicely it works with
    kernel accelerated CSes, which already was partly fanout.
    
    This can be useful when network is very fast and we have no problems
    sending 3 requests instead of one over the same network. Also it decreases
    load to CSes, they do not need to send along the chain. Yet it increases
    load at client. Whether do we get a profit - it depends.
    
    https://pmc.acronis.work/browse/VSTOR-54040
    
    Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
    
    Feature: vStorage
---
 fs/fuse/kio/pcs/pcs_cs.c         | 214 ++++++++++++++++++++++++++++++++++-----
 fs/fuse/kio/pcs/pcs_cs.h         |   1 +
 fs/fuse/kio/pcs/pcs_cs_accel.c   |  61 +++++++----
 fs/fuse/kio/pcs/pcs_cs_prot.h    |  14 ++-
 fs/fuse/kio/pcs/pcs_map.c        |   5 +
 fs/fuse/kio/pcs/pcs_map.h        |   1 +
 fs/fuse/kio/pcs/pcs_prot_types.h |   1 +
 fs/fuse/kio/pcs/pcs_req.h        |  14 ++-
 8 files changed, 261 insertions(+), 50 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index fffbce6110b5..59b752dfdc8c 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -292,7 +292,22 @@ void cs_log_io_times(struct pcs_int_request * ireq, struct pcs_msg * resp, unsig
 	int reqt = h->hdr.type != PCS_CS_SYNC_RESP ? ireq->iochunk.cmd : PCS_REQ_T_SYNC;
 
 	if (ireq->iochunk.parent_N && h->hdr.type != PCS_CS_READ_RESP && h->hdr.type != PCS_CS_FIEMAP_RESP) {
-		pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, resp->rpc->peer_id);
+		if (!(ireq->flags & IREQ_F_FANOUT)) {
+			pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, resp->rpc->peer_id);
+		} else {
+			int idx = ireq->iochunk.cs_index;
+			struct pcs_int_request * parent = ireq->iochunk.parent_N;
+
+			parent->iochunk.fo.io_times[idx].csid = resp->rpc->peer_id.val;
+			/* XXX kio does not implement flow detection (for now) and does
+			 * not use flag PCS_CS_IO_SEQ. So, use it here to indicate
+			 * performed fanout.
+			 */
+			parent->iochunk.fo.io_times[idx].misc = h->sync.misc | PCS_CS_IO_SEQ;
+			parent->iochunk.fo.io_times[idx].ts_net = h->sync.ts_net;
+			parent->iochunk.fo.io_times[idx].ts_io = h->sync.ts_io;
+
+		}
 		return;
 	}
 
@@ -605,41 +620,105 @@ static void cs_sent(struct pcs_msg *msg)
 	pcs_rpc_sent(msg);
 }
 
-void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+static void ireq_complete_fo(struct pcs_int_request * ireq)
 {
-	struct pcs_msg *msg = &ireq->iochunk.msg;
-	struct pcs_cs_iohdr *ioh;
-	struct pcs_cs_list *csl = ireq->iochunk.csl;
-	struct pcs_map_entry *map = ireq->iochunk.map; /* ireq keeps reference to map */
-	int storage_version = atomic_read(&ireq->cc->storage_version);
-	int aligned_msg;
+	if (!atomic_dec_and_test(&ireq->iochunk.fo.iocount))
+		return;
 
-	BUG_ON(msg->rpc);
+	if (pcs_if_error(&ireq->error)) {
+		FUSE_KTRACE(ireq->cc->fc, "IO error %d %lu, ireq:%p : %llu:%u+%u",
+		      ireq->error.value,
+		      ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+		      ireq, (unsigned long long)ireq->iochunk.chunk,
+		      (unsigned)ireq->iochunk.offset,
+		      (unsigned)ireq->iochunk.size);
+	} else if (ireq->iochunk.parent_N) {
+		struct pcs_int_request * parent = ireq->iochunk.parent_N;
+		int n = ireq->iochunk.fo.num_iotimes;
+		int idx = ireq->iochunk.cs_index;
 
-	ireq->ts_sent = ktime_get();
+		if (idx < parent->iochunk.acr.num_awr)
+			idx = parent->iochunk.acr.num_awr;
 
-	if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags & IREQ_F_NO_ACCEL)) {
-		if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
-			if (pcs_csa_cs_submit(cs, ireq))
-				return;
-		} else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
-			/* Synchronous writes in accel mode are still not supported */
-			if (!(ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) &&
-			    !ireq->dentry->no_write_delay) {
-				struct pcs_int_request * sreq;
+		if (n > PCS_MAX_ACCEL_CS)
+			n = PCS_MAX_ACCEL_CS;
 
-				sreq = pcs_csa_csl_write_submit(ireq);
-				if (!sreq)
-					return;
-				if (sreq != ireq) {
-					ireq = sreq;
-					cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
-					msg = &ireq->iochunk.msg;
+		if (n > idx) {
+			memcpy(&parent->iochunk.acr.io_times[idx], &ireq->iochunk.fo.io_times[idx],
+			       (n - idx) * sizeof(struct fuse_tr_iotimes_cs));
+		}
+		parent->iochunk.acr.num_iotimes = n;
+	} else {
+		struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc;
+
+		fuse_stat_observe(fc, PCS_REQ_T_WRITE, ktime_sub(ktime_get(), ireq->ts_sent));
+
+		if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
+			struct fuse_trace_hdr * t;
+			int n = ireq->iochunk.fo.num_iotimes;
+
+			t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, sizeof(struct fuse_tr_iotimes_hdr) +
+					       n*sizeof(struct fuse_tr_iotimes_cs));
+			if (t) {
+				struct fuse_tr_iotimes_hdr * th = (struct fuse_tr_iotimes_hdr *)(t + 1);
+				struct fuse_tr_iotimes_cs * ch = (struct fuse_tr_iotimes_cs *)(th + 1);
+				int i;
+
+				th->chunk = ireq->iochunk.chunk;
+				th->offset = ireq->iochunk.chunk + ireq->iochunk.offset;
+				th->size = ireq->iochunk.size;
+				th->start_time = ktime_to_us(ireq->ts);
+				th->local_delay = ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts));
+				th->lat = t->time - ktime_to_us(ireq->ts_sent);
+				th->ino = ireq->dentry->fileinfo.attr.id;
+				th->type = PCS_CS_WRITE_AL_RESP;
+				th->cses = n;
+
+				for (i = 0; i < n; i++, ch++) {
+					*ch = ireq->iochunk.fo.io_times[i];
 				}
 			}
 		}
+		FUSE_TRACE_COMMIT(fc->ktrace);
+	}
+	ireq->iochunk.msg.destructor = NULL;
+	ireq->iochunk.msg.rpc = NULL;
+	ireq_complete(ireq);
+}
+
+static void complete_fo_request(struct pcs_int_request * sreq)
+{
+	struct pcs_int_request * ireq = sreq->iochunk.parent_N;
+
+	if (pcs_if_error(&sreq->error)) {
+		if (!pcs_if_error(&ireq->error))
+			ireq->error = sreq->error;
 	}
 
+	/* And free all clone resources */
+	pcs_sreq_detach(sreq);
+	if (sreq->iochunk.map)
+		pcs_map_put(sreq->iochunk.map);
+	if (sreq->iochunk.csl)
+		cslist_put(sreq->iochunk.csl);
+	if (sreq->iochunk.flow)
+		pcs_flow_put(sreq->iochunk.flow, &sreq->cc->maps.ftab);
+	ireq_destroy(sreq);
+
+	ireq_complete_fo(ireq);
+}
+
+static void do_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+{
+	struct pcs_msg *msg = &ireq->iochunk.msg;
+	struct pcs_cs_iohdr *ioh;
+	struct pcs_cs_list *csl = ireq->iochunk.csl;
+	struct pcs_map_entry *map = ireq->iochunk.map; /* ireq keeps reference to map */
+	int storage_version = atomic_read(&ireq->cc->storage_version);
+	int aligned_msg;
+
+	BUG_ON(msg->rpc);
+
 	msg->private = cs;
 	msg->private2 = ireq;
 
@@ -685,6 +764,8 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
                 ioh->sync.misc |= PCS_CS_IO_NOCSUM;
 	if ((ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) || ireq->dentry->no_write_delay)
 		ioh->sync.misc |= PCS_CS_IO_SYNC;
+	if (ireq->flags & IREQ_F_FANOUT)
+		ioh->sync.misc = PCS_CS_IO_FANOUT;
 
 	msg->size = ioh->hdr.len;
 	msg->rpc = NULL;
@@ -731,6 +812,87 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	pcs_rpc_queue(cs->rpc, msg);
 }
 
+static inline int eligible_for_fanout(struct pcs_int_request * ireq)
+{
+	return (cs_enable_fanout && pcs_cs_fanout(atomic_read(&ireq->cc->storage_version)) &&
+		ireq->iochunk.csl->nsrv <= PCS_MAP_MAX_FO_CS &&
+		ireq->iochunk.cs_index + 1 < ireq->iochunk.csl->nsrv &&
+		!(ireq->iochunk.csl->flags & CS_FL_REPLICATING));
+}
+
+void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+{
+	ireq->ts_sent = ktime_get();
+
+	if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags & IREQ_F_NO_ACCEL)) {
+		if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
+			if (pcs_csa_cs_submit(cs, ireq))
+				return;
+		} else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
+			/* Synchronous writes in accel mode are still not supported */
+			if (!(ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) &&
+			    !ireq->dentry->no_write_delay) {
+				struct pcs_int_request * sreq;
+
+				sreq = pcs_csa_csl_write_submit(ireq);
+				if (!sreq)
+					return;
+				if (sreq != ireq) {
+					ireq = sreq;
+					cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
+				}
+			}
+		}
+	}
+
+	if (ireq->iochunk.cmd == PCS_REQ_T_WRITE && eligible_for_fanout(ireq)) {
+		int idx = ireq->iochunk.cs_index;
+		struct pcs_cs_list * csl = ireq->iochunk.csl;
+
+		atomic_set(&ireq->iochunk.fo.iocount, 1);
+		ireq->iochunk.fo.num_iotimes = csl->nsrv;
+
+		for (; idx < csl->nsrv; idx++) {
+			struct pcs_int_request * sreq;
+
+			sreq = pcs_ireq_split(ireq, 0, 1);
+			if (sreq == NULL) {
+				ireq->error.remote = 1;
+				ireq->error.offender = ireq->iochunk.csl->cs[idx].info.id;
+				ireq->error.value = PCS_ERR_NORES;
+				ireq_complete_fo(ireq);
+				return;
+			}
+
+			sreq->iochunk.size = ireq->iochunk.size;
+			sreq->iochunk.csl = ireq->iochunk.csl;
+			cslist_get(ireq->iochunk.csl);
+			sreq->flags |= IREQ_F_NOACCT|IREQ_F_FANOUT;
+			sreq->complete_cb = complete_fo_request;
+			sreq->iochunk.parent_N = ireq;
+			sreq->iochunk.cs_index = idx;
+			atomic_inc(&ireq->iochunk.fo.iocount);
+
+			/* If it is not the first cs on remaining chain, we can
+			 * try to check for eligibility for direct cs access.
+			 * This will handle the case when a local cs is not at head of
+			 * chain.
+			 */
+			if (idx == ireq->iochunk.cs_index ||
+			    (ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) ||
+			    ireq->dentry->no_write_delay ||
+			    ((ireq->iochunk.size|ireq->iochunk.offset) & 511) ||
+			    (ireq->flags & IREQ_F_NO_ACCEL) ||
+			    !pcs_csa_csl_write_submit_single(sreq, idx))
+				do_cs_submit(ireq->iochunk.csl->cs[idx].cslink.cs, sreq);
+		}
+		ireq_complete_fo(ireq);
+		return;
+	}
+
+	do_cs_submit(cs, ireq);
+}
+
 static void handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
 {
 	struct pcs_cs *who;
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 0b17e924a534..61be99a54157 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -215,6 +215,7 @@ struct pcs_msg* pcs_alloc_cs_msg(u32 type, u32 size, u32 storage_version);
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq);
 struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq);
+int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx);
 void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id);
 void pcs_csa_cs_detach(struct pcs_cs * cs);
 
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index ae8562eff822..3b75571e9b94 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -692,7 +692,6 @@ int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
 
 static void ireq_init_acr(struct pcs_int_request * ireq)
 {
-	ireq->iochunk.parent_N = NULL;
 	atomic_set(&ireq->iochunk.acr.iocount, 1);
 	ireq->iochunk.acr.num_awr = 0;
 	ireq->iochunk.acr.num_iotimes = 0;
@@ -715,7 +714,6 @@ static void ireq_clear_acr(struct pcs_int_request * ireq)
 	}
 	ireq->iochunk.msg.destructor = NULL;
 	ireq->iochunk.msg.rpc = NULL;
-	ireq->iochunk.parent_N = NULL;
 	ireq->flags |= IREQ_F_NO_ACCEL;
 }
 
@@ -724,13 +722,19 @@ void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr *
 	int idx = ireq->iochunk.acr.num_awr;
 	struct pcs_cs_sync_resp * srec;
 
-	ireq->iochunk.acr.io_times[idx].cs_id = cs_id;
-	ireq->iochunk.acr.io_times[idx].sync = h->sync;
+	ireq->iochunk.acr.io_times[idx].csid = cs_id.val;
+	ireq->iochunk.acr.io_times[idx].misc = h->sync.misc;
+	ireq->iochunk.acr.io_times[idx].ts_net = h->sync.ts_net;
+	ireq->iochunk.acr.io_times[idx].ts_io = h->sync.ts_io;
 
 	for (srec = (struct pcs_cs_sync_resp*)(h + 1), idx++;
 	     (void*)(srec + 1) <= (void*)h + h->hdr.len && idx < PCS_MAX_ACCEL_CS;
-	     srec++, idx++)
-		ireq->iochunk.acr.io_times[idx] = *srec;
+	     srec++, idx++) {
+		ireq->iochunk.acr.io_times[idx].csid = srec->cs_id.val;
+		ireq->iochunk.acr.io_times[idx].misc = srec->sync.misc;
+		ireq->iochunk.acr.io_times[idx].ts_net = srec->sync.ts_net;
+		ireq->iochunk.acr.io_times[idx].ts_io = srec->sync.ts_io;
+	}
 
 	ireq->iochunk.acr.num_iotimes = idx;
 }
@@ -746,6 +750,13 @@ static void __complete_acr_work(struct work_struct * w)
 		      ireq, (unsigned long long)ireq->iochunk.chunk,
 		      (unsigned)ireq->iochunk.offset,
 		      (unsigned)ireq->iochunk.size);
+	} else if (ireq->iochunk.parent_N) {
+		struct pcs_int_request * parent = ireq->iochunk.parent_N;
+		int idx = ireq->iochunk.cs_index;
+
+		WARN_ON(!(ireq->flags & IREQ_F_FANOUT));
+		parent->iochunk.fo.io_times[idx] = ireq->iochunk.acr.io_times[idx];
+		parent->iochunk.fo.io_times[idx].misc |= PCS_CS_IO_SEQ;
 	} else {
 		struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc;
 
@@ -772,13 +783,8 @@ static void __complete_acr_work(struct work_struct * w)
 				th->type = PCS_CS_WRITE_AL_RESP;
 				th->cses = n;
 
-				for (i = 0; i < n; i++) {
-					ch->csid = ireq->iochunk.acr.io_times[i].cs_id.val;
-					ch->misc = ireq->iochunk.acr.io_times[i].sync.misc;
-					ch->ts_net = ireq->iochunk.acr.io_times[i].sync.ts_net;
-					ch->ts_io = ireq->iochunk.acr.io_times[i].sync.ts_io;
-					ch++;
-				}
+				for (i = 0; i < n; i++, ch++)
+					*ch = ireq->iochunk.acr.io_times[i];
 			}
 		}
 		FUSE_TRACE_COMMIT(fc->ktrace);
@@ -807,10 +813,11 @@ static void __pcs_csa_write_final_completion(struct pcs_accel_write_req *areq)
 	ireq = container_of(areq - areq->index, struct pcs_int_request, iochunk.acr.awr[0]);
 
 	if (!pcs_if_error(&ireq->error)) {
-		struct pcs_cs_sync_resp * sresp = &ireq->iochunk.acr.io_times[areq->index];
-		sresp->cs_id.val = ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK;
-		sresp->sync.ts_net = 0;
-		sresp->sync.ts_io = ktime_to_us(ktime_get()) - sresp->sync.misc;
+		struct fuse_tr_iotimes_cs * th = &ireq->iochunk.acr.io_times[areq->index];
+		th->csid = ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK;
+		th->ts_net = 0;
+		th->ts_io = ktime_to_us(ktime_get()) - th->misc;
+		th->misc &= PCS_CS_TS_MASK;
 	}
 
 	csa_complete_acr(ireq);
@@ -992,7 +999,7 @@ static inline int csa_submit_write(struct file * file, struct pcs_int_request *
 	areq->index = idx;
 	ireq->iochunk.acr.num_awr = idx + 1;
 
-	ireq->iochunk.acr.io_times[idx].sync.misc = ktime_to_us(ktime_get());
+	ireq->iochunk.acr.io_times[idx].misc = ktime_to_us(ktime_get());
 
 	ret = call_write_iter(file, iocb, it);
 
@@ -1090,7 +1097,6 @@ static void complete_N_request(struct pcs_int_request * sreq)
 	csa_complete_acr(ireq);
 }
 
-
 struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq)
 {
 	int idx;
@@ -1149,6 +1155,23 @@ struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq)
 }
 
 
+int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx)
+{
+	if (idx >= PCS_MAX_ACCEL_CS)
+		return 0;
+
+	ireq_init_acr(ireq);
+
+	if (!csa_cs_submit_write(ireq, idx)) {
+		ireq_clear_acr(ireq);
+		return 0;
+	}
+
+	ireq->iochunk.acr.num_iotimes = idx;
+	csa_complete_acr(ireq);
+	return 1;
+}
+
 static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 {
 	struct pcs_csa_context *ctx = file->private_data;
diff --git a/fs/fuse/kio/pcs/pcs_cs_prot.h b/fs/fuse/kio/pcs/pcs_cs_prot.h
index 6e0d37157cb0..c72066de7be4 100644
--- a/fs/fuse/kio/pcs/pcs_cs_prot.h
+++ b/fs/fuse/kio/pcs/pcs_cs_prot.h
@@ -40,10 +40,13 @@ struct pcs_cs_sync_data
 #define PCS_CS_IO_NOCSUM	(1ULL<<61)	/* Req: skip crc verification */
 #define PCS_CS_IO_SYNC		(1ULL<<60)	/* Req: DSYNC request */
 #define PCS_CS_IO_BACKGROUND	(1ULL<<59)	/* Req: low priority request */
+#define PCS_CS_IO_FANOUT	(1ULL<<58)	/* Req: request must not be forwarded */
+#define PCS_CS_IO_CLEAR		(1ULL<<57)	/* Req/resp: indicator that write is done to stable chain */
 
-#define PCS_CS_RESET_TS_RECV(sdata, ts)	do { (sdata)->misc = ((u64)ts & 0xFFFFFFFFFFFFFULL); } while (0)
-#define PCS_CS_SET_TS_RECV(sdata, ts)	do { (sdata)->misc = ((sdata)->misc & ~0xFFFFFFFFFFFFFULL) | ((u64)ts & 0xFFFFFFFFFFFFFULL); } while (0)
-#define PCS_CS_ADD_TS_RECV(sdata, ts)	do { (sdata)->misc |= ((u64)ts & 0xFFFFFFFFFFFFFULL); } while (0)
+#define PCS_CS_TS_MASK		0xFFFFFFFFFFFFFULL
+#define PCS_CS_RESET_TS_RECV(sdata, ts)	do { (sdata)->misc = ((u64)ts & PCS_CS_TS_MASK); } while (0)
+#define PCS_CS_SET_TS_RECV(sdata, ts)	do { (sdata)->misc = ((sdata)->misc & ~PCS_CS_TS_MASK) | ((u64)ts & PCS_CS_TS_MASK); } while (0)
+#define PCS_CS_ADD_TS_RECV(sdata, ts)	do { (sdata)->misc |= ((u64)ts & PCS_CS_TS_MASK); } while (0)
 #define PCS_CS_GET_TS_RECV(sdata)	((sdata)->misc & 0xFFFFFFFFFFFFFULL)
 
 struct pcs_cs_sync_resp {
@@ -84,6 +87,11 @@ static inline int pcs_cs_use_aligned_io(u32 storage_version)
 	return (storage_version >= PCS_CS_MSG_ALIGNED_VERSION);
 }
 
+static inline int pcs_cs_fanout(u32 storage_version)
+{
+	return (storage_version >= PCS_CS_FANOUT);
+}
+
 /* Maximal message size. Actually, random */
 #define PCS_CS_MSG_MAX_SIZE	(1024*1024 + sizeof(struct pcs_cs_iohdr))
 
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 634775b3cb9e..29ed4fab4290 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -42,6 +42,10 @@ unsigned int cs_io_locality = 0;
 module_param(cs_io_locality, uint, 0644);
 MODULE_PARM_DESC(cs_io_locality, "CS IO locality");
 
+unsigned int cs_enable_fanout = 0;
+module_param(cs_enable_fanout, uint, 0644);
+MODULE_PARM_DESC(cs_enable_fanout, "Enable CS fanout");
+
 static struct pcs_cs_list *cs_link_to_cs_list(struct pcs_cs_link *csl)
 {
 	struct pcs_cs_record *cs_rec;
@@ -976,6 +980,7 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re
 
 		if (cs_list->cs[i].info.flags & CS_FL_REPLICATING) {
 			__set_bit(i, &cs_list->blacklist);
+			cs_list->flags |= CS_FL_REPLICATING;
 			cs_list->blacklist_expires = jiffies + PCS_REPLICATION_BLACKLIST_TIMEOUT;
 		}
 
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index bfe0719eebe2..8cc7dfefc17e 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -220,6 +220,7 @@ unsigned long pcs_map_shrink_scan(struct shrinker *,  struct shrink_control *sc)
 void ireq_drop_tokens(struct pcs_int_request * ireq);
 
 extern unsigned int cs_io_locality;
+extern unsigned int cs_enable_fanout;
 
 void cslist_destroy(struct pcs_cs_list * csl);
 
diff --git a/fs/fuse/kio/pcs/pcs_prot_types.h b/fs/fuse/kio/pcs/pcs_prot_types.h
index d1ed5484f41b..def0073b9509 100644
--- a/fs/fuse/kio/pcs/pcs_prot_types.h
+++ b/fs/fuse/kio/pcs/pcs_prot_types.h
@@ -23,6 +23,7 @@
 
 #define PCS_VZ7_VERSION 100
 #define PCS_CS_MSG_ALIGNED_VERSION 134
+#define PCS_CS_FANOUT 177
 
 /* milliseconds since Jan 1970 */
 typedef u64 PCS_FILETIME_T;
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 8ee32b33f3f0..c677332ead28 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -15,6 +15,7 @@
 #include "pcs_cs_prot.h"
 #include "pcs_rpc.h"
 #include "pcs_cs.h"
+#include "fuse_ktrace_prot.h"
 #include "fuse_stat.h"
 #include "../../fuse_i.h"
 
@@ -60,6 +61,7 @@ struct pcs_aio_req
 };
 
 #define PCS_MAX_ACCEL_CS	3
+#define PCS_MAP_MAX_FO_CS	8
 
 struct pcs_accel_write_req
 {
@@ -75,12 +77,11 @@ struct pcs_accel_write_req
 
 struct pcs_accel_req
 {
-	struct pcs_int_request		*parent;
 	atomic_t			iocount;
 	int				num_awr;
 	struct pcs_accel_write_req	awr[PCS_MAX_ACCEL_CS];
 	int				num_iotimes;
-	struct pcs_cs_sync_resp		io_times[PCS_MAX_ACCEL_CS];
+	struct fuse_tr_iotimes_cs	io_times[PCS_MAX_ACCEL_CS];
 	struct work_struct		work;
 };
 
@@ -93,6 +94,13 @@ struct pcs_iochunk_req {
 	struct pcs_int_request	*parent_N;
 };
 
+struct pcs_fo_req
+{
+	atomic_t			iocount;
+	int				num_iotimes;
+	struct fuse_tr_iotimes_cs	io_times[PCS_MAP_MAX_FO_CS];
+};
+
 struct pcs_int_request
 {
 	struct pcs_cluster_core* cc;
@@ -119,6 +127,7 @@ struct pcs_int_request
 #define IREQ_F_CRYPT		0x2000
 #define IREQ_F_ACCELERROR	0x4000
 #define IREQ_F_NOACCT		0x8000
+#define IREQ_F_FANOUT	       0x10000
 
 	atomic_t		iocount;
 
@@ -179,6 +188,7 @@ struct pcs_int_request
 					struct pcs_int_request	*parent_N;
 				};
 				struct pcs_iochunk_req		ir;
+				struct pcs_fo_req		fo;
 				struct pcs_aio_req		ar;
 				struct pcs_accel_req		acr;
 			};


More information about the Devel mailing list