[Devel] [PATCH VZ9 08/20] fuse: implement fanout writes

Alexey Kuznetsov kuznet at virtuozzo.com
Fri Oct 6 13:43:01 MSK 2023


Module parameter cs_enable_fanout=1 is required.

In case when this is legitimate writes are done not to chain of CSes,
but to each of CSes point to point. Especially nicely it works with
kernel accelerated CSes, which already was partly fanout.

This can be useful when network is very fast and we have no problems
sending 3 requests instead of one over the same network. Also it decreases
load to CSes, they do not need to send along the chain. Yet it increases
load at client. Whether do we get a profit - it depends.

Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
---
 fs/fuse/kio/pcs/pcs_cs.c         | 214 ++++++++++++++++++++++++++++++++++-----
 fs/fuse/kio/pcs/pcs_cs.h         |   1 +
 fs/fuse/kio/pcs/pcs_cs_accel.c   |  61 +++++++----
 fs/fuse/kio/pcs/pcs_cs_prot.h    |  14 ++-
 fs/fuse/kio/pcs/pcs_map.c        |   5 +
 fs/fuse/kio/pcs/pcs_map.h        |   1 +
 fs/fuse/kio/pcs/pcs_prot_types.h |   1 +
 fs/fuse/kio/pcs/pcs_req.h        |  14 ++-
 8 files changed, 261 insertions(+), 50 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index fffbce6..59b752d 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -292,7 +292,22 @@ void cs_log_io_times(struct pcs_int_request * ireq, struct pcs_msg * resp, unsig
 	int reqt = h->hdr.type != PCS_CS_SYNC_RESP ? ireq->iochunk.cmd : PCS_REQ_T_SYNC;
 
 	if (ireq->iochunk.parent_N && h->hdr.type != PCS_CS_READ_RESP && h->hdr.type != PCS_CS_FIEMAP_RESP) {
-		pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, resp->rpc->peer_id);
+		if (!(ireq->flags & IREQ_F_FANOUT)) {
+			pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, resp->rpc->peer_id);
+		} else {
+			int idx = ireq->iochunk.cs_index;
+			struct pcs_int_request * parent = ireq->iochunk.parent_N;
+
+			parent->iochunk.fo.io_times[idx].csid = resp->rpc->peer_id.val;
+			/* XXX kio does not implement flow detection (for now) and does
+			 * not use flag PCS_CS_IO_SEQ. So, use it here to indicate
+			 * performed fanout.
+			 */
+			parent->iochunk.fo.io_times[idx].misc = h->sync.misc | PCS_CS_IO_SEQ;
+			parent->iochunk.fo.io_times[idx].ts_net = h->sync.ts_net;
+			parent->iochunk.fo.io_times[idx].ts_io = h->sync.ts_io;
+
+		}
 		return;
 	}
 
@@ -605,41 +620,105 @@ static void cs_sent(struct pcs_msg *msg)
 	pcs_rpc_sent(msg);
 }
 
-void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+static void ireq_complete_fo(struct pcs_int_request * ireq)
 {
-	struct pcs_msg *msg = &ireq->iochunk.msg;
-	struct pcs_cs_iohdr *ioh;
-	struct pcs_cs_list *csl = ireq->iochunk.csl;
-	struct pcs_map_entry *map = ireq->iochunk.map; /* ireq keeps reference to map */
-	int storage_version = atomic_read(&ireq->cc->storage_version);
-	int aligned_msg;
+	if (!atomic_dec_and_test(&ireq->iochunk.fo.iocount))
+		return;
 
-	BUG_ON(msg->rpc);
+	if (pcs_if_error(&ireq->error)) {
+		FUSE_KTRACE(ireq->cc->fc, "IO error %d %lu, ireq:%p : %llu:%u+%u",
+		      ireq->error.value,
+		      ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+		      ireq, (unsigned long long)ireq->iochunk.chunk,
+		      (unsigned)ireq->iochunk.offset,
+		      (unsigned)ireq->iochunk.size);
+	} else if (ireq->iochunk.parent_N) {
+		struct pcs_int_request * parent = ireq->iochunk.parent_N;
+		int n = ireq->iochunk.fo.num_iotimes;
+		int idx = ireq->iochunk.cs_index;
 
-	ireq->ts_sent = ktime_get();
+		if (idx < parent->iochunk.acr.num_awr)
+			idx = parent->iochunk.acr.num_awr;
 
-	if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags & IREQ_F_NO_ACCEL)) {
-		if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
-			if (pcs_csa_cs_submit(cs, ireq))
-				return;
-		} else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
-			/* Synchronous writes in accel mode are still not supported */
-			if (!(ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) &&
-			    !ireq->dentry->no_write_delay) {
-				struct pcs_int_request * sreq;
+		if (n > PCS_MAX_ACCEL_CS)
+			n = PCS_MAX_ACCEL_CS;
 
-				sreq = pcs_csa_csl_write_submit(ireq);
-				if (!sreq)
-					return;
-				if (sreq != ireq) {
-					ireq = sreq;
-					cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
-					msg = &ireq->iochunk.msg;
+		if (n > idx) {
+			memcpy(&parent->iochunk.acr.io_times[idx], &ireq->iochunk.fo.io_times[idx],
+			       (n - idx) * sizeof(struct fuse_tr_iotimes_cs));
+		}
+		parent->iochunk.acr.num_iotimes = n;
+	} else {
+		struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc;
+
+		fuse_stat_observe(fc, PCS_REQ_T_WRITE, ktime_sub(ktime_get(), ireq->ts_sent));
+
+		if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
+			struct fuse_trace_hdr * t;
+			int n = ireq->iochunk.fo.num_iotimes;
+
+			t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, sizeof(struct fuse_tr_iotimes_hdr) +
+					       n*sizeof(struct fuse_tr_iotimes_cs));
+			if (t) {
+				struct fuse_tr_iotimes_hdr * th = (struct fuse_tr_iotimes_hdr *)(t + 1);
+				struct fuse_tr_iotimes_cs * ch = (struct fuse_tr_iotimes_cs *)(th + 1);
+				int i;
+
+				th->chunk = ireq->iochunk.chunk;
+				th->offset = ireq->iochunk.chunk + ireq->iochunk.offset;
+				th->size = ireq->iochunk.size;
+				th->start_time = ktime_to_us(ireq->ts);
+				th->local_delay = ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts));
+				th->lat = t->time - ktime_to_us(ireq->ts_sent);
+				th->ino = ireq->dentry->fileinfo.attr.id;
+				th->type = PCS_CS_WRITE_AL_RESP;
+				th->cses = n;
+
+				for (i = 0; i < n; i++, ch++) {
+					*ch = ireq->iochunk.fo.io_times[i];
 				}
 			}
 		}
+		FUSE_TRACE_COMMIT(fc->ktrace);
+	}
+	ireq->iochunk.msg.destructor = NULL;
+	ireq->iochunk.msg.rpc = NULL;
+	ireq_complete(ireq);
+}
+
+static void complete_fo_request(struct pcs_int_request * sreq)
+{
+	struct pcs_int_request * ireq = sreq->iochunk.parent_N;
+
+	if (pcs_if_error(&sreq->error)) {
+		if (!pcs_if_error(&ireq->error))
+			ireq->error = sreq->error;
 	}
 
+	/* And free all clone resources */
+	pcs_sreq_detach(sreq);
+	if (sreq->iochunk.map)
+		pcs_map_put(sreq->iochunk.map);
+	if (sreq->iochunk.csl)
+		cslist_put(sreq->iochunk.csl);
+	if (sreq->iochunk.flow)
+		pcs_flow_put(sreq->iochunk.flow, &sreq->cc->maps.ftab);
+	ireq_destroy(sreq);
+
+	ireq_complete_fo(ireq);
+}
+
+static void do_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+{
+	struct pcs_msg *msg = &ireq->iochunk.msg;
+	struct pcs_cs_iohdr *ioh;
+	struct pcs_cs_list *csl = ireq->iochunk.csl;
+	struct pcs_map_entry *map = ireq->iochunk.map; /* ireq keeps reference to map */
+	int storage_version = atomic_read(&ireq->cc->storage_version);
+	int aligned_msg;
+
+	BUG_ON(msg->rpc);
+
 	msg->private = cs;
 	msg->private2 = ireq;
 
@@ -685,6 +764,8 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
                 ioh->sync.misc |= PCS_CS_IO_NOCSUM;
 	if ((ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) || ireq->dentry->no_write_delay)
 		ioh->sync.misc |= PCS_CS_IO_SYNC;
+	if (ireq->flags & IREQ_F_FANOUT)
+		ioh->sync.misc = PCS_CS_IO_FANOUT;
 
 	msg->size = ioh->hdr.len;
 	msg->rpc = NULL;
@@ -731,6 +812,87 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	pcs_rpc_queue(cs->rpc, msg);
 }
 
+static inline int eligible_for_fanout(struct pcs_int_request * ireq)
+{
+	return (cs_enable_fanout && pcs_cs_fanout(atomic_read(&ireq->cc->storage_version)) &&
+		ireq->iochunk.csl->nsrv <= PCS_MAP_MAX_FO_CS &&
+		ireq->iochunk.cs_index + 1 < ireq->iochunk.csl->nsrv &&
+		!(ireq->iochunk.csl->flags & CS_FL_REPLICATING));
+}
+
+void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
+{
+	ireq->ts_sent = ktime_get();
+
+	if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags & IREQ_F_NO_ACCEL)) {
+		if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
+			if (pcs_csa_cs_submit(cs, ireq))
+				return;
+		} else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
+			/* Synchronous writes in accel mode are still not supported */
+			if (!(ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) &&
+			    !ireq->dentry->no_write_delay) {
+				struct pcs_int_request * sreq;
+
+				sreq = pcs_csa_csl_write_submit(ireq);
+				if (!sreq)
+					return;
+				if (sreq != ireq) {
+					ireq = sreq;
+					cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
+				}
+			}
+		}
+	}
+
+	if (ireq->iochunk.cmd == PCS_REQ_T_WRITE && eligible_for_fanout(ireq)) {
+		int idx = ireq->iochunk.cs_index;
+		struct pcs_cs_list * csl = ireq->iochunk.csl;
+
+		atomic_set(&ireq->iochunk.fo.iocount, 1);
+		ireq->iochunk.fo.num_iotimes = csl->nsrv;
+
+		for (; idx < csl->nsrv; idx++) {
+			struct pcs_int_request * sreq;
+
+			sreq = pcs_ireq_split(ireq, 0, 1);
+			if (sreq == NULL) {
+				ireq->error.remote = 1;
+				ireq->error.offender = ireq->iochunk.csl->cs[idx].info.id;
+				ireq->error.value = PCS_ERR_NORES;
+				ireq_complete_fo(ireq);
+				return;
+			}
+
+			sreq->iochunk.size = ireq->iochunk.size;
+			sreq->iochunk.csl = ireq->iochunk.csl;
+			cslist_get(ireq->iochunk.csl);
+			sreq->flags |= IREQ_F_NOACCT|IREQ_F_FANOUT;
+			sreq->complete_cb = complete_fo_request;
+			sreq->iochunk.parent_N = ireq;
+			sreq->iochunk.cs_index = idx;
+			atomic_inc(&ireq->iochunk.fo.iocount);
+
+			/* If it is not the first cs on remaining chain, we can
+			 * try to check for eligibility for direct cs access.
+			 * This will handle the case when a local cs is not at head of
+			 * chain.
+			 */
+			if (idx == ireq->iochunk.cs_index ||
+			    (ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) ||
+			    ireq->dentry->no_write_delay ||
+			    ((ireq->iochunk.size|ireq->iochunk.offset) & 511) ||
+			    (ireq->flags & IREQ_F_NO_ACCEL) ||
+			    !pcs_csa_csl_write_submit_single(sreq, idx))
+				do_cs_submit(ireq->iochunk.csl->cs[idx].cslink.cs, sreq);
+		}
+		ireq_complete_fo(ireq);
+		return;
+	}
+
+	do_cs_submit(cs, ireq);
+}
+
 static void handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
 {
 	struct pcs_cs *who;
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 0b17e924a..61be99a 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -215,6 +215,7 @@ static inline bool cs_is_blacklisted(struct pcs_cs *cs)
 
 int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq);
 struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq);
+int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx);
 void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id);
 void pcs_csa_cs_detach(struct pcs_cs * cs);
 
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index ae8562e..3b75571 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -692,7 +692,6 @@ int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
 
 static void ireq_init_acr(struct pcs_int_request * ireq)
 {
-	ireq->iochunk.parent_N = NULL;
 	atomic_set(&ireq->iochunk.acr.iocount, 1);
 	ireq->iochunk.acr.num_awr = 0;
 	ireq->iochunk.acr.num_iotimes = 0;
@@ -715,7 +714,6 @@ static void ireq_clear_acr(struct pcs_int_request * ireq)
 	}
 	ireq->iochunk.msg.destructor = NULL;
 	ireq->iochunk.msg.rpc = NULL;
-	ireq->iochunk.parent_N = NULL;
 	ireq->flags |= IREQ_F_NO_ACCEL;
 }
 
@@ -724,13 +722,19 @@ void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr *
 	int idx = ireq->iochunk.acr.num_awr;
 	struct pcs_cs_sync_resp * srec;
 
-	ireq->iochunk.acr.io_times[idx].cs_id = cs_id;
-	ireq->iochunk.acr.io_times[idx].sync = h->sync;
+	ireq->iochunk.acr.io_times[idx].csid = cs_id.val;
+	ireq->iochunk.acr.io_times[idx].misc = h->sync.misc;
+	ireq->iochunk.acr.io_times[idx].ts_net = h->sync.ts_net;
+	ireq->iochunk.acr.io_times[idx].ts_io = h->sync.ts_io;
 
 	for (srec = (struct pcs_cs_sync_resp*)(h + 1), idx++;
 	     (void*)(srec + 1) <= (void*)h + h->hdr.len && idx < PCS_MAX_ACCEL_CS;
-	     srec++, idx++)
-		ireq->iochunk.acr.io_times[idx] = *srec;
+	     srec++, idx++) {
+		ireq->iochunk.acr.io_times[idx].csid = srec->cs_id.val;
+		ireq->iochunk.acr.io_times[idx].misc = srec->sync.misc;
+		ireq->iochunk.acr.io_times[idx].ts_net = srec->sync.ts_net;
+		ireq->iochunk.acr.io_times[idx].ts_io = srec->sync.ts_io;
+	}
 
 	ireq->iochunk.acr.num_iotimes = idx;
 }
@@ -746,6 +750,13 @@ static void __complete_acr_work(struct work_struct * w)
 		      ireq, (unsigned long long)ireq->iochunk.chunk,
 		      (unsigned)ireq->iochunk.offset,
 		      (unsigned)ireq->iochunk.size);
+	} else if (ireq->iochunk.parent_N) {
+		struct pcs_int_request * parent = ireq->iochunk.parent_N;
+		int idx = ireq->iochunk.cs_index;
+
+		WARN_ON(!(ireq->flags & IREQ_F_FANOUT));
+		parent->iochunk.fo.io_times[idx] = ireq->iochunk.acr.io_times[idx];
+		parent->iochunk.fo.io_times[idx].misc |= PCS_CS_IO_SEQ;
 	} else {
 		struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc;
 
@@ -772,13 +783,8 @@ static void __complete_acr_work(struct work_struct * w)
 				th->type = PCS_CS_WRITE_AL_RESP;
 				th->cses = n;
 
-				for (i = 0; i < n; i++) {
-					ch->csid = ireq->iochunk.acr.io_times[i].cs_id.val;
-					ch->misc = ireq->iochunk.acr.io_times[i].sync.misc;
-					ch->ts_net = ireq->iochunk.acr.io_times[i].sync.ts_net;
-					ch->ts_io = ireq->iochunk.acr.io_times[i].sync.ts_io;
-					ch++;
-				}
+				for (i = 0; i < n; i++, ch++)
+					*ch = ireq->iochunk.acr.io_times[i];
 			}
 		}
 		FUSE_TRACE_COMMIT(fc->ktrace);
@@ -807,10 +813,11 @@ static void __pcs_csa_write_final_completion(struct pcs_accel_write_req *areq)
 	ireq = container_of(areq - areq->index, struct pcs_int_request, iochunk.acr.awr[0]);
 
 	if (!pcs_if_error(&ireq->error)) {
-		struct pcs_cs_sync_resp * sresp = &ireq->iochunk.acr.io_times[areq->index];
-		sresp->cs_id.val = ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK;
-		sresp->sync.ts_net = 0;
-		sresp->sync.ts_io = ktime_to_us(ktime_get()) - sresp->sync.misc;
+		struct fuse_tr_iotimes_cs * th = &ireq->iochunk.acr.io_times[areq->index];
+		th->csid = ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK;
+		th->ts_net = 0;
+		th->ts_io = ktime_to_us(ktime_get()) - th->misc;
+		th->misc &= PCS_CS_TS_MASK;
 	}
 
 	csa_complete_acr(ireq);
@@ -992,7 +999,7 @@ static inline int csa_submit_write(struct file * file, struct pcs_int_request *
 	areq->index = idx;
 	ireq->iochunk.acr.num_awr = idx + 1;
 
-	ireq->iochunk.acr.io_times[idx].sync.misc = ktime_to_us(ktime_get());
+	ireq->iochunk.acr.io_times[idx].misc = ktime_to_us(ktime_get());
 
 	ret = call_write_iter(file, iocb, it);
 
@@ -1090,7 +1097,6 @@ static void complete_N_request(struct pcs_int_request * sreq)
 	csa_complete_acr(ireq);
 }
 
-
 struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq)
 {
 	int idx;
@@ -1149,6 +1155,23 @@ struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq)
 }
 
 
+int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx)
+{
+	if (idx >= PCS_MAX_ACCEL_CS)
+		return 0;
+
+	ireq_init_acr(ireq);
+
+	if (!csa_cs_submit_write(ireq, idx)) {
+		ireq_clear_acr(ireq);
+		return 0;
+	}
+
+	ireq->iochunk.acr.num_iotimes = idx;
+	csa_complete_acr(ireq);
+	return 1;
+}
+
 static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 {
 	struct pcs_csa_context *ctx = file->private_data;
diff --git a/fs/fuse/kio/pcs/pcs_cs_prot.h b/fs/fuse/kio/pcs/pcs_cs_prot.h
index 6e0d371..c72066d 100644
--- a/fs/fuse/kio/pcs/pcs_cs_prot.h
+++ b/fs/fuse/kio/pcs/pcs_cs_prot.h
@@ -40,10 +40,13 @@ struct pcs_cs_sync_data
 #define PCS_CS_IO_NOCSUM	(1ULL<<61)	/* Req: skip crc verification */
 #define PCS_CS_IO_SYNC		(1ULL<<60)	/* Req: DSYNC request */
 #define PCS_CS_IO_BACKGROUND	(1ULL<<59)	/* Req: low priority request */
+#define PCS_CS_IO_FANOUT	(1ULL<<58)	/* Req: request must not be forwarded */
+#define PCS_CS_IO_CLEAR		(1ULL<<57)	/* Req/resp: indicator that write is done to stable chain */
 
-#define PCS_CS_RESET_TS_RECV(sdata, ts)	do { (sdata)->misc = ((u64)ts & 0xFFFFFFFFFFFFFULL); } while (0)
-#define PCS_CS_SET_TS_RECV(sdata, ts)	do { (sdata)->misc = ((sdata)->misc & ~0xFFFFFFFFFFFFFULL) | ((u64)ts & 0xFFFFFFFFFFFFFULL); } while (0)
-#define PCS_CS_ADD_TS_RECV(sdata, ts)	do { (sdata)->misc |= ((u64)ts & 0xFFFFFFFFFFFFFULL); } while (0)
+#define PCS_CS_TS_MASK		0xFFFFFFFFFFFFFULL
+#define PCS_CS_RESET_TS_RECV(sdata, ts)	do { (sdata)->misc = ((u64)ts & PCS_CS_TS_MASK); } while (0)
+#define PCS_CS_SET_TS_RECV(sdata, ts)	do { (sdata)->misc = ((sdata)->misc & ~PCS_CS_TS_MASK) | ((u64)ts & PCS_CS_TS_MASK); } while (0)
+#define PCS_CS_ADD_TS_RECV(sdata, ts)	do { (sdata)->misc |= ((u64)ts & PCS_CS_TS_MASK); } while (0)
 #define PCS_CS_GET_TS_RECV(sdata)	((sdata)->misc & 0xFFFFFFFFFFFFFULL)
 
 struct pcs_cs_sync_resp {
@@ -84,6 +87,11 @@ static inline int pcs_cs_use_aligned_io(u32 storage_version)
 	return (storage_version >= PCS_CS_MSG_ALIGNED_VERSION);
 }
 
+static inline int pcs_cs_fanout(u32 storage_version)
+{
+	return (storage_version >= PCS_CS_FANOUT);
+}
+
 /* Maximal message size. Actually, random */
 #define PCS_CS_MSG_MAX_SIZE	(1024*1024 + sizeof(struct pcs_cs_iohdr))
 
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 634775b..29ed4fa 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -42,6 +42,10 @@
 module_param(cs_io_locality, uint, 0644);
 MODULE_PARM_DESC(cs_io_locality, "CS IO locality");
 
+unsigned int cs_enable_fanout = 0;
+module_param(cs_enable_fanout, uint, 0644);
+MODULE_PARM_DESC(cs_enable_fanout, "Enable CS fanout");
+
 static struct pcs_cs_list *cs_link_to_cs_list(struct pcs_cs_link *csl)
 {
 	struct pcs_cs_record *cs_rec;
@@ -976,6 +980,7 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re
 
 		if (cs_list->cs[i].info.flags & CS_FL_REPLICATING) {
 			__set_bit(i, &cs_list->blacklist);
+			cs_list->flags |= CS_FL_REPLICATING;
 			cs_list->blacklist_expires = jiffies + PCS_REPLICATION_BLACKLIST_TIMEOUT;
 		}
 
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index bfe0719..8cc7dfe 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -220,6 +220,7 @@ static inline struct pcs_cluster_core *cc_from_map(struct pcs_map_entry * m)
 void ireq_drop_tokens(struct pcs_int_request * ireq);
 
 extern unsigned int cs_io_locality;
+extern unsigned int cs_enable_fanout;
 
 void cslist_destroy(struct pcs_cs_list * csl);
 
diff --git a/fs/fuse/kio/pcs/pcs_prot_types.h b/fs/fuse/kio/pcs/pcs_prot_types.h
index d1ed548..def0073 100644
--- a/fs/fuse/kio/pcs/pcs_prot_types.h
+++ b/fs/fuse/kio/pcs/pcs_prot_types.h
@@ -23,6 +23,7 @@
 
 #define PCS_VZ7_VERSION 100
 #define PCS_CS_MSG_ALIGNED_VERSION 134
+#define PCS_CS_FANOUT 177
 
 /* milliseconds since Jan 1970 */
 typedef u64 PCS_FILETIME_T;
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 8ee32b3..c677332 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -15,6 +15,7 @@
 #include "pcs_cs_prot.h"
 #include "pcs_rpc.h"
 #include "pcs_cs.h"
+#include "fuse_ktrace_prot.h"
 #include "fuse_stat.h"
 #include "../../fuse_i.h"
 
@@ -60,6 +61,7 @@ struct pcs_aio_req
 };
 
 #define PCS_MAX_ACCEL_CS	3
+#define PCS_MAP_MAX_FO_CS	8
 
 struct pcs_accel_write_req
 {
@@ -75,12 +77,11 @@ struct pcs_accel_write_req
 
 struct pcs_accel_req
 {
-	struct pcs_int_request		*parent;
 	atomic_t			iocount;
 	int				num_awr;
 	struct pcs_accel_write_req	awr[PCS_MAX_ACCEL_CS];
 	int				num_iotimes;
-	struct pcs_cs_sync_resp		io_times[PCS_MAX_ACCEL_CS];
+	struct fuse_tr_iotimes_cs	io_times[PCS_MAX_ACCEL_CS];
 	struct work_struct		work;
 };
 
@@ -93,6 +94,13 @@ struct pcs_iochunk_req {
 	struct pcs_int_request	*parent_N;
 };
 
+struct pcs_fo_req
+{
+	atomic_t			iocount;
+	int				num_iotimes;
+	struct fuse_tr_iotimes_cs	io_times[PCS_MAP_MAX_FO_CS];
+};
+
 struct pcs_int_request
 {
 	struct pcs_cluster_core* cc;
@@ -119,6 +127,7 @@ struct pcs_int_request
 #define IREQ_F_CRYPT		0x2000
 #define IREQ_F_ACCELERROR	0x4000
 #define IREQ_F_NOACCT		0x8000
+#define IREQ_F_FANOUT	       0x10000
 
 	atomic_t		iocount;
 
@@ -179,6 +188,7 @@ struct pcs_int_request
 					struct pcs_int_request	*parent_N;
 				};
 				struct pcs_iochunk_req		ir;
+				struct pcs_fo_req		fo;
 				struct pcs_aio_req		ar;
 				struct pcs_accel_req		acr;
 			};
-- 
1.8.3.1



More information about the Devel mailing list