[Devel] [PATCH VZ9 07/20] fuse: cs acceleration for writes
Alexey Kuznetsov
kuznet at virtuozzo.com
Fri Oct 6 13:42:55 MSK 2023
If we have a chain of CSes:
D1 -> D2 -> Dk -> Nk+1 ... -> Nn
whered k D-CSes are accessible locally and Nk+1 .. Nn are accessible
only through network (some of them can happen to be local).
We can fanout direct writes to D1, ... and Dk and send a write to remaining tail
Nk+1 ... -> Nn.
It is for user-space CS to decide when this is legal thing to do.
Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
---
fs/fuse/kio/pcs/pcs_cs.c | 32 ++-
fs/fuse/kio/pcs/pcs_cs.h | 2 +
fs/fuse/kio/pcs/pcs_cs_accel.c | 518 +++++++++++++++++++++++++++++++++++++++--
fs/fuse/kio/pcs/pcs_map.c | 18 +-
fs/fuse/kio/pcs/pcs_map.h | 16 ++
fs/fuse/kio/pcs/pcs_req.h | 38 +++
6 files changed, 592 insertions(+), 32 deletions(-)
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index c518cc9..fffbce6 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -291,6 +291,11 @@ void cs_log_io_times(struct pcs_int_request * ireq, struct pcs_msg * resp, unsig
struct pcs_cs_iohdr * h = (struct pcs_cs_iohdr *)msg_inline_head(resp);
int reqt = h->hdr.type != PCS_CS_SYNC_RESP ? ireq->iochunk.cmd : PCS_REQ_T_SYNC;
+ if (ireq->iochunk.parent_N && h->hdr.type != PCS_CS_READ_RESP && h->hdr.type != PCS_CS_FIEMAP_RESP) {
+ pcs_csa_relay_iotimes(ireq->iochunk.parent_N, h, resp->rpc->peer_id);
+ return;
+ }
+
fuse_stat_observe(fc, reqt, ktime_sub(ktime_get(), ireq->ts_sent));
if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
int n = 1;
@@ -611,10 +616,28 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
BUG_ON(msg->rpc);
- if (ireq->iochunk.cmd == PCS_REQ_T_READ && !((ireq->iochunk.size|ireq->iochunk.offset) & 511) &&
- !(ireq->flags & IREQ_F_NO_ACCEL)) {
- if (pcs_csa_cs_submit(cs, ireq))
- return;
+ ireq->ts_sent = ktime_get();
+
+ if (!((ireq->iochunk.size|ireq->iochunk.offset) & 511) && !(ireq->flags & IREQ_F_NO_ACCEL)) {
+ if (ireq->iochunk.cmd == PCS_REQ_T_READ) {
+ if (pcs_csa_cs_submit(cs, ireq))
+ return;
+ } else if (ireq->iochunk.cmd == PCS_REQ_T_WRITE) {
+ /* Synchronous writes in accel mode are still not supported */
+ if (!(ireq->dentry->fileinfo.attr.attrib & PCS_FATTR_IMMEDIATE_WRITE) &&
+ !ireq->dentry->no_write_delay) {
+ struct pcs_int_request * sreq;
+
+ sreq = pcs_csa_csl_write_submit(ireq);
+ if (!sreq)
+ return;
+ if (sreq != ireq) {
+ ireq = sreq;
+ cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
+ msg = &ireq->iochunk.msg;
+ }
+ }
+ }
}
msg->private = cs;
@@ -686,7 +709,6 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
msg->timeout = csl->write_timeout;
else
msg->timeout = csl->read_timeout;
- ireq->ts_sent = ktime_get();
ireq->wait_origin.val = 0;
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 0564ef3..0b17e924a 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -214,6 +214,8 @@ static inline bool cs_is_blacklisted(struct pcs_cs *cs)
struct pcs_msg* pcs_alloc_cs_msg(u32 type, u32 size, u32 storage_version);
int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq);
+struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq);
+void pcs_csa_relay_iotimes(struct pcs_int_request * ireq, struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id);
void pcs_csa_cs_detach(struct pcs_cs * cs);
#endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index fc6a35f..ae8562e 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -146,14 +146,9 @@ void pcs_csa_cs_detach(struct pcs_cs * cs)
}
}
-static inline struct pcs_csa_entry * cse_lookup(struct pcs_csa_context * ctx, u64 chunk_id)
+static inline struct pcs_csa_entry * __cse_lookup(struct pcs_csa_context * ctx, u64 chunk_id)
{
- struct pcs_csa_entry * cse;
-
- rcu_read_lock();
- cse= radix_tree_lookup(&ctx->tree, chunk_id);
- rcu_read_unlock();
- return cse;
+ return radix_tree_lookup(&ctx->tree, chunk_id);
}
static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, u32 flags, PCS_MAP_VERSION_T * vers,
@@ -372,12 +367,15 @@ static void __pcs_csa_final_completion(struct pcs_aio_req *areq)
if (!pcs_if_error(&ireq->error) && (ireq->flags & IREQ_F_CRYPT)) {
struct pcs_cs * cs = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].cslink.cs;
- struct pcs_csa_context * ctx = rcu_dereference(cs->csa_ctx);
+ struct pcs_csa_context * ctx;
+ rcu_read_lock();
+ ctx = rcu_dereference(cs->csa_ctx);
if (!ctx || !ctx->tfm || decrypt_data(ireq, ctx->tfm)) {
ireq->error.remote = 1;
ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
ireq->error.value = PCS_ERR_IO;
}
+ rcu_read_unlock();
}
if (areq->crc) {
@@ -564,7 +562,7 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
BUG_ON(parent->type != PCS_IREQ_API);
ar = parent->apireq.req;
- ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, 0);
+ ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, READ);
if (!iov_iter_is_bvec(it)) {
FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back");
return -EINVAL;
@@ -581,7 +579,6 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
/* One ref is ours, other is for AIO. If crc read is needed we will grab the third */
atomic_set(&areq->iocount, 2);
- ireq->ts_sent = ktime_get();
ret = call_read_iter(file, iocb, it);
if (unlikely(ret != -EIOCBQUEUED)) {
@@ -637,28 +634,521 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
int pcs_csa_cs_submit(struct pcs_cs * cs, struct pcs_int_request * ireq)
{
- struct pcs_csa_context * csa_ctx = rcu_dereference(cs->csa_ctx);
+ struct pcs_csa_context * csa_ctx;
+
+ rcu_read_lock();
+ csa_ctx = rcu_dereference(cs->csa_ctx);
if (csa_ctx) {
struct pcs_map_entry * map = ireq->iochunk.map;
- struct pcs_csa_entry * csa = cse_lookup(csa_ctx, map->id);
+ struct pcs_csa_entry * csa;
+
+ csa = __cse_lookup(csa_ctx, map->id);
if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 &&
(csa->flags & PCS_CSA_FL_READ)) {
/* XXX Paranoia? Verify! */
if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) {
+ struct file * file = get_file(csa->file);
+ struct file * cfile = csa->cfile ? get_file(csa->cfile) : NULL;
+ unsigned int flags = csa->flags;
+ int err;
+
if (csa_ctx->tfm)
ireq->flags |= IREQ_F_CRYPT;
- if (!csa_submit(csa->file, csa->cfile, csa->flags&PCS_CSA_FL_CSUM, ireq))
+
+ rcu_read_unlock();
+ err = csa_submit(file, cfile, flags&PCS_CSA_FL_CSUM, ireq);
+ fput(file);
+ if (cfile)
+ fput(cfile);
+ if (!err)
return 1;
+ rcu_read_lock();
/* Clear state which could be rewritten by csa_submit */
ireq->iochunk.msg.destructor = NULL;
ireq->iochunk.msg.rpc = NULL;
}
}
}
+ rcu_read_unlock();
return 0;
}
+/* Write engine. It is similar to read, code could be merged. Actually the situation
+ * with nsrv=1 is just exactly the same. But yet reads can be optimized a lot better
+ * and we do not want to lose this advantage.
+ *
+ * Terminology:
+ * Original ireq - ireq which is supposed to be submitted to head of cs chain
+ * D-request - replicas at head of chain which have accelrated mappings and eligible
+ * for local aio processing
+ * They are presented as struct's pcs_accel_write_req which are stored
+ * as element of array awr[i] in struct pcs_accel_req in original ireq.iochunk.acr
+ * N-request - Request to be submitted to tail of cs chain following the last D-request
+ * It is presented as cloned original ireq with overriden completion callback,
+ * so that its errors and not preocessed, but copied to the original ireq
+ * to be processed on completion of original.
+ */
+
+static void ireq_init_acr(struct pcs_int_request * ireq)
+{
+ ireq->iochunk.parent_N = NULL;
+ atomic_set(&ireq->iochunk.acr.iocount, 1);
+ ireq->iochunk.acr.num_awr = 0;
+ ireq->iochunk.acr.num_iotimes = 0;
+}
+
+static void ireq_clear_acr(struct pcs_int_request * ireq)
+{
+ int i, n;
+
+ for (i = 0; i < ireq->iochunk.acr.num_awr; i++) {
+ struct bio_vec * bvec = ireq->iochunk.acr.awr[i].bvec_copy;
+ if (bvec) {
+ for (n = ireq->iochunk.acr.awr[i].num_copy_bvecs-1; n>=0; n--) {
+ if (bvec[n].bv_page)
+ put_page(bvec[n].bv_page);
+ }
+ kfree(bvec);
+ ireq->iochunk.acr.awr[i].bvec_copy = NULL;
+ }
+ }
+ ireq->iochunk.msg.destructor = NULL;
+ ireq->iochunk.msg.rpc = NULL;
+ ireq->iochunk.parent_N = NULL;
+ ireq->flags |= IREQ_F_NO_ACCEL;
+}
+
+void pcs_csa_relay_iotimes(struct pcs_int_request * ireq, struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id)
+{
+ int idx = ireq->iochunk.acr.num_awr;
+ struct pcs_cs_sync_resp * srec;
+
+ ireq->iochunk.acr.io_times[idx].cs_id = cs_id;
+ ireq->iochunk.acr.io_times[idx].sync = h->sync;
+
+ for (srec = (struct pcs_cs_sync_resp*)(h + 1), idx++;
+ (void*)(srec + 1) <= (void*)h + h->hdr.len && idx < PCS_MAX_ACCEL_CS;
+ srec++, idx++)
+ ireq->iochunk.acr.io_times[idx] = *srec;
+
+ ireq->iochunk.acr.num_iotimes = idx;
+}
+
+static void __complete_acr_work(struct work_struct * w)
+{
+ struct pcs_int_request * ireq = container_of(w, struct pcs_int_request, iochunk.acr.work);
+
+ if (pcs_if_error(&ireq->error)) {
+ FUSE_KTRACE(ireq->cc->fc, "IO error %d %lu, ireq:%p : %llu:%u+%u",
+ ireq->error.value,
+ ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+ ireq, (unsigned long long)ireq->iochunk.chunk,
+ (unsigned)ireq->iochunk.offset,
+ (unsigned)ireq->iochunk.size);
+ } else {
+ struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc;
+
+ fuse_stat_observe(fc, PCS_REQ_T_WRITE, ktime_sub(ktime_get(), ireq->ts_sent));
+
+ if (fc->ktrace && fc->ktrace_level >= LOG_TRACE) {
+ struct fuse_trace_hdr * t;
+ int n = ireq->iochunk.acr.num_iotimes;
+
+ t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, sizeof(struct fuse_tr_iotimes_hdr) +
+ n*sizeof(struct fuse_tr_iotimes_cs));
+ if (t) {
+ struct fuse_tr_iotimes_hdr * th = (struct fuse_tr_iotimes_hdr *)(t + 1);
+ struct fuse_tr_iotimes_cs * ch = (struct fuse_tr_iotimes_cs *)(th + 1);
+ int i;
+
+ th->chunk = ireq->iochunk.chunk;
+ th->offset = ireq->iochunk.chunk + ireq->iochunk.offset;
+ th->size = ireq->iochunk.size;
+ th->start_time = ktime_to_us(ireq->ts);
+ th->local_delay = ktime_to_us(ktime_sub(ireq->ts_sent, ireq->ts));
+ th->lat = t->time - ktime_to_us(ireq->ts_sent);
+ th->ino = ireq->dentry->fileinfo.attr.id;
+ th->type = PCS_CS_WRITE_AL_RESP;
+ th->cses = n;
+
+ for (i = 0; i < n; i++) {
+ ch->csid = ireq->iochunk.acr.io_times[i].cs_id.val;
+ ch->misc = ireq->iochunk.acr.io_times[i].sync.misc;
+ ch->ts_net = ireq->iochunk.acr.io_times[i].sync.ts_net;
+ ch->ts_io = ireq->iochunk.acr.io_times[i].sync.ts_io;
+ ch++;
+ }
+ }
+ }
+ FUSE_TRACE_COMMIT(fc->ktrace);
+ }
+
+ ireq_clear_acr(ireq);
+ /* This will either complete or retry the whole request */
+ ireq_complete(ireq);
+}
+
+static inline void csa_complete_acr(struct pcs_int_request * ireq)
+{
+ if (atomic_dec_and_test(&ireq->iochunk.acr.iocount)) {
+ INIT_WORK(&ireq->iochunk.acr.work, __complete_acr_work);
+ queue_work(ireq->cc->wq, &ireq->iochunk.acr.work);
+ }
+}
+
+
+static void __pcs_csa_write_final_completion(struct pcs_accel_write_req *areq)
+{
+ struct pcs_int_request * ireq;
+
+ fput(areq->iocb.ki_filp);
+
+ ireq = container_of(areq - areq->index, struct pcs_int_request, iochunk.acr.awr[0]);
+
+ if (!pcs_if_error(&ireq->error)) {
+ struct pcs_cs_sync_resp * sresp = &ireq->iochunk.acr.io_times[areq->index];
+ sresp->cs_id.val = ireq->iochunk.csl->cs[areq->index].info.id.val | PCS_NODE_ALT_MASK;
+ sresp->sync.ts_net = 0;
+ sresp->sync.ts_io = ktime_to_us(ktime_get()) - sresp->sync.misc;
+ }
+
+ csa_complete_acr(ireq);
+}
+
+static void csa_write_complete_work(struct work_struct *w)
+{
+ struct pcs_accel_write_req * areq = container_of(w, struct pcs_accel_write_req, work);
+
+ __pcs_csa_write_final_completion(areq);
+}
+
+static void csa_write_complete(struct kiocb *iocb, long ret)
+{
+ struct pcs_accel_write_req * areq;
+ struct pcs_int_request * ireq;
+
+ areq = container_of(iocb, struct pcs_accel_write_req, iocb);
+ ireq = container_of(areq-areq->index, struct pcs_int_request, iochunk.acr.awr[0]);
+
+ if (ret != ireq->iochunk.size) {
+ if (!ireq->error.value) {
+ ireq->error.remote = 1;
+ ireq->error.offender = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id;
+ ireq->error.value = PCS_ERR_IO;
+ ireq->flags |= IREQ_F_ACCELERROR;
+ }
+ }
+
+ if (atomic_dec_and_test(&areq->iocount)) {
+ INIT_WORK(&areq->work, csa_write_complete_work);
+ queue_work(ireq->cc->wq, &areq->work);
+ }
+}
+
+static void encrypt_page_ctr(struct crypto_sync_skcipher * tfm, struct page * dst, struct page *src,
+ unsigned int offset, unsigned int len, u64 pos, u64 chunk_id)
+{
+ struct scatterlist sgi, sgo;
+ struct { u64 a, b; } iv;
+ SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+ skcipher_request_set_sync_tfm(req, tfm);
+ skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
+ sg_init_table(&sgi, 1);
+ sg_init_table(&sgo, 1);
+
+ iv.a = chunk_id;
+ iv.b = cpu_to_be64(pos / 16);
+ sg_set_page(&sgi, src, len, offset);
+ sg_set_page(&sgo, dst, len, offset);
+ skcipher_request_set_crypt(req, &sgi, &sgo, len, &iv);
+ crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->encrypt(req);
+}
+
+static void encrypt_page_xts(struct crypto_sync_skcipher * tfm, struct page * dst, struct page *src,
+ unsigned int offset, unsigned int len, u64 pos, u64 chunk_id)
+{
+ struct scatterlist sgi, sgo;
+ struct { u64 a, b; } iv;
+ SYNC_SKCIPHER_REQUEST_ON_STACK(req, tfm);
+
+ skcipher_request_set_sync_tfm(req, tfm);
+ skcipher_request_set_callback(req, CRYPTO_TFM_REQ_MAY_SLEEP, NULL, NULL);
+ sg_init_table(&sgi, 1);
+ sg_init_table(&sgo, 1);
+
+ for ( ; len > 0; len -= 512) {
+ iv.a = pos / 512;
+ iv.b = chunk_id;
+ sg_set_page(&sgi, src, 512, offset);
+ sg_set_page(&sgo, dst, 512, offset);
+ skcipher_request_set_crypt(req, &sgi, &sgo, 512, &iv);
+ crypto_skcipher_alg(crypto_skcipher_reqtfm(req))->encrypt(req);
+ pos += 512;
+ offset += 512;
+ }
+}
+
+static int init_crypted_data(struct pcs_int_request * ireq, int idx)
+{
+ struct pcs_int_request *parent = ireq->completion_data.parent;
+ struct pcs_fuse_req * r;
+ struct bio_vec * bvec;
+ int n, nvec;
+ u64 pos;
+ u64 chunk_id;
+ struct pcs_csa_context * csa_ctx;
+ struct crypto_sync_skcipher * tfm;
+
+ BUG_ON(parent->type != PCS_IREQ_API);
+ r = parent->apireq.req->datasource;
+
+ nvec = r->exec.io.num_bvecs;
+
+ /* XXX oops, this can sleep. tfm can be destroyed. Need refcount yet?
+ * Seems, not. We just have to refetch tfm from cs after allocations,
+ * failing if it is destroyed already.
+ */
+ bvec = kmalloc(sizeof(struct bio_vec) * nvec, GFP_NOIO);
+ if (!bvec)
+ return -ENOMEM;
+
+ for (n = 0; n < nvec; n++) {
+ bvec[n] = r->exec.io.bvec[n];
+ if ((bvec[n].bv_offset|bvec[n].bv_len)&511)
+ goto out;
+ bvec[n].bv_page = alloc_page(GFP_NOIO);
+ if (!bvec[n].bv_page)
+ goto out;
+ }
+
+ rcu_read_lock();
+ csa_ctx = rcu_dereference(ireq->iochunk.csl->cs[idx].cslink.cs->csa_ctx);
+ if (!csa_ctx || ((tfm = rcu_dereference(csa_ctx->tfm)) == NULL)) {
+ rcu_read_unlock();
+ goto out;
+ }
+
+ pos = ireq->iochunk.offset;
+ chunk_id = ireq->iochunk.map->id;
+ for (n = 0; n < nvec; n++) {
+ if (tfm->base.base.__crt_alg->cra_priority == 400)
+ encrypt_page_ctr(tfm, bvec[n].bv_page, r->exec.io.bvec[n].bv_page, bvec[n].bv_offset, bvec[n].bv_len, pos, chunk_id);
+ else
+ encrypt_page_xts(tfm, bvec[n].bv_page, r->exec.io.bvec[n].bv_page, bvec[n].bv_offset, bvec[n].bv_len, pos, chunk_id);
+ pos += bvec[n].bv_len;
+ }
+ rcu_read_unlock();
+
+ ireq->iochunk.acr.awr[idx].bvec_copy = bvec;
+ ireq->iochunk.acr.awr[idx].num_copy_bvecs = n;
+ return 0;
+
+out:
+ while (--n >= 0)
+ put_page(bvec[n].bv_page);
+ kfree(bvec);
+ return -ENOMEM;
+}
+
+static inline int csa_submit_write(struct file * file, struct pcs_int_request * ireq, int idx, int do_crypt)
+{
+ struct pcs_accel_write_req * areq = &ireq->iochunk.acr.awr[idx];
+ struct kiocb * iocb = &areq->iocb;
+ struct iov_iter iter;
+ struct iov_iter * it = &iter; /* Just to use this pointer instead of &iter */
+ unsigned int size = ireq->iochunk.size;
+ int ret;
+
+ if (do_crypt) {
+ if (init_crypted_data(ireq, idx))
+ return -EINVAL;
+ iov_iter_bvec(it, WRITE, areq->bvec_copy, areq->num_copy_bvecs, size);
+ } else {
+ struct pcs_int_request *parent = ireq->completion_data.parent;
+ pcs_api_iorequest_t *ar;
+
+ areq->bvec_copy = NULL;
+ BUG_ON(parent->type != PCS_IREQ_API);
+ ar = parent->apireq.req;
+ ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, WRITE);
+ if (!iov_iter_is_bvec(it)) {
+ FUSE_KTRACE(ireq->cc->fc, "Not a bvec, falling back");
+ return -EINVAL;
+ }
+ iov_iter_truncate(it, size);
+ }
+
+ iocb->ki_pos = ireq->iochunk.offset;
+ iocb->ki_filp = get_file(file);
+ iocb->ki_complete = csa_write_complete;
+ iocb->ki_flags = IOCB_DIRECT;
+ iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
+
+ /* One ref is ours, other is for AIO. */
+ atomic_set(&areq->iocount, 2);
+ atomic_inc(&ireq->iochunk.acr.iocount);
+ areq->index = idx;
+ ireq->iochunk.acr.num_awr = idx + 1;
+
+ ireq->iochunk.acr.io_times[idx].sync.misc = ktime_to_us(ktime_get());
+
+ ret = call_write_iter(file, iocb, it);
+
+ if (unlikely(ret != -EIOCBQUEUED)) {
+ if (ret != size) {
+ if (!ireq->error.value) {
+ ireq->error.remote = 1;
+ ireq->error.offender = ireq->iochunk.csl->cs[idx].info.id;
+ ireq->error.value = PCS_ERR_IO;
+ }
+
+ /* Do not drop refs, we do not want to complete ireq. */
+ fput(areq->iocb.ki_filp);
+ FUSE_KTRACE(ireq->cc->fc, "AIO submit rejected ret=%d %lu, ireq:%p : %llu:%u+%u",
+ ret, ireq->error.remote ? (unsigned long)ireq->error.offender.val : 0UL,
+ ireq, (unsigned long long)ireq->iochunk.chunk,
+ (unsigned)ireq->iochunk.offset,
+ (unsigned)size);
+ if (atomic_dec_and_test(&ireq->iochunk.acr.iocount))
+ BUG();
+ return ret >= 0 ? -EIO : ret;
+ }
+
+ /* IO already finished. Drop AIO refcnt and proceed to crc */
+ FUSE_KTRACE(ireq->cc->fc, "No good, AIO executed synchronously, ireq:%p : %llu:%u+%u",
+ ireq, (unsigned long long)ireq->iochunk.chunk,
+ (unsigned)ireq->iochunk.offset,
+ (unsigned)size);
+
+ if (atomic_dec_and_test(&areq->iocount))
+ BUG();
+ }
+
+ if (atomic_dec_and_test(&areq->iocount)) {
+ INIT_WORK(&areq->work, csa_write_complete_work);
+ queue_work(ireq->cc->wq, &areq->work);
+ }
+ return 0;
+}
+
+static int csa_cs_submit_write(struct pcs_int_request * ireq, int idx)
+{
+ struct pcs_cs * cs = ireq->iochunk.csl->cs[idx].cslink.cs;
+ struct pcs_csa_context * csa_ctx;
+
+ if (idx >= PCS_MAX_ACCEL_CS)
+ return 0;
+
+ rcu_read_lock();
+ csa_ctx = rcu_dereference(cs->csa_ctx);
+ if (csa_ctx) {
+ struct pcs_map_entry * map = ireq->iochunk.map;
+ struct pcs_csa_entry * csa = __cse_lookup(csa_ctx, map->id);
+ if (csa && memcmp(&ireq->iochunk.csl->version, &csa->version, sizeof(PCS_MAP_VERSION_T)) == 0 &&
+ (csa->flags & PCS_CSA_FL_WRITE)) {
+ /* XXX Paranoia? Verify! */
+ if (!(map->state & PCS_MAP_DEAD) && map->cs_list == ireq->iochunk.csl) {
+ struct file * file = get_file(csa->file);
+ int do_crypt = (csa_ctx->tfm != NULL);
+ int err;
+
+ rcu_read_unlock();
+ err = csa_submit_write(file, ireq, idx, do_crypt);
+ fput(file);
+ return !err;
+ }
+ }
+ }
+ rcu_read_unlock();
+ return 0;
+}
+
+static void complete_N_request(struct pcs_int_request * sreq)
+{
+ struct pcs_int_request * ireq = sreq->iochunk.parent_N;
+
+ if (pcs_if_error(&sreq->error)) {
+ /* Error on N-request overrides any error on a D-request. */
+ ireq->error = sreq->error;
+ ireq->flags |= IREQ_F_NO_ACCEL;
+ /* Clear ACCELERROR to deliver this error normally, through invalidating the map */
+ ireq->flags &= ~IREQ_F_ACCELERROR;
+ }
+
+ /* And free all clone resources */
+ pcs_sreq_detach(sreq);
+ if (sreq->iochunk.map)
+ pcs_map_put(sreq->iochunk.map);
+ if (sreq->iochunk.csl)
+ cslist_put(sreq->iochunk.csl);
+ if (sreq->iochunk.flow)
+ pcs_flow_put(sreq->iochunk.flow, &sreq->cc->maps.ftab);
+ ireq_destroy(sreq);
+
+ csa_complete_acr(ireq);
+}
+
+
+struct pcs_int_request * pcs_csa_csl_write_submit(struct pcs_int_request * ireq)
+{
+ int idx;
+ struct pcs_cs_list *csl = ireq->iochunk.csl;
+
+ if (csl->nsrv > PCS_MAX_ACCEL_CS)
+ return ireq;
+
+ ireq_init_acr(ireq);
+
+ for (idx = 0; idx < csl->nsrv; idx++) {
+ if (!csa_cs_submit_write(ireq, idx))
+ break;
+ }
+
+ if (idx == 0) {
+ /* Nothing was handled. Just proceed to normal submit */
+ ireq_clear_acr(ireq);
+ return ireq;
+ } else if (idx >= csl->nsrv) {
+ /* Everything went locally. No network at all. */
+ ireq->iochunk.acr.num_iotimes = idx;
+ csa_complete_acr(ireq);
+ return NULL;
+ } else {
+ /* Harder case. We have to transmit to tail replicas */
+ struct pcs_int_request * sreq = pcs_ireq_split(ireq, 0, 1);
+ if (sreq == NULL) {
+ /* Some D replicas are submitted. So, we have to go
+ * through error cycle.
+ */
+ ireq->error.remote = 1;
+ ireq->error.offender = ireq->iochunk.csl->cs[idx].info.id;
+ ireq->error.value = PCS_ERR_NORES;
+ csa_complete_acr(ireq);
+ return NULL;
+ }
+
+ ireq->iochunk.acr.num_iotimes = idx;
+
+ /* ireq_split does not copy size and csl */
+ sreq->iochunk.size = ireq->iochunk.size;
+ sreq->iochunk.csl = ireq->iochunk.csl;
+ cslist_get(ireq->iochunk.csl);
+ /* Yet this sreq is not actually accounted, the accounting is made for original ireq */
+ sreq->flags |= IREQ_F_NOACCT;
+ sreq->complete_cb = complete_N_request;
+ sreq->iochunk.parent_N = ireq;
+ sreq->iochunk.cs_index = idx;
+
+ /* Our original iocount ref goes to N-request,
+ * Proceed with sending sreq to the tail of cs chain
+ */
+ return sreq;
+ }
+}
+
+
static long csa_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
{
struct pcs_csa_context *ctx = file->private_data;
@@ -717,6 +1207,7 @@ static int csa_release(struct inode *inode, struct file *file)
struct pcs_cs * cs;
ctx->dead = 1;
+ rcu_read_lock();
if ((cs = rcu_dereference(ctx->cs)) != NULL) {
spin_lock(&cs->lock);
if (ctx->cs == cs) {
@@ -728,6 +1219,7 @@ static int csa_release(struct inode *inode, struct file *file)
}
spin_unlock(&cs->lock);
}
+ rcu_read_unlock();
wake_up_poll(&ctx->wqh, EPOLLHUP);
pcs_csa_put(ctx);
return 0;
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 9dc1c95..634775b 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -70,7 +70,7 @@ static inline unsigned int pcs_sync_timeout(struct pcs_cluster_core *cc)
return PCS_SYNC_TIMEOUT;
}
-static void cslist_destroy(struct pcs_cs_list * csl)
+void cslist_destroy(struct pcs_cs_list * csl)
{
int i;
@@ -97,19 +97,6 @@ static void cslist_destroy(struct pcs_cs_list * csl)
kfree(csl);
}
-static inline void cslist_get(struct pcs_cs_list * csl)
-{
- TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt));
-
- atomic_inc(&csl->refcnt);
-}
-static inline void cslist_put(struct pcs_cs_list * csl)
-{
- TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt));
- if (atomic_dec_and_test(&csl->refcnt))
- cslist_destroy(csl);
-}
-
static void map_drop_cslist(struct pcs_map_entry * m)
{
assert_spin_locked(&m->lock);
@@ -1579,6 +1566,9 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
unsigned long long match_id = 0;
struct pcs_cs_list * csl, ** csl_p = 0;
+ if (ireq->flags & IREQ_F_NOACCT)
+ return;
+
switch (ireq->type) {
case PCS_IREQ_IOCHUNK:
csl_p = &ireq->iochunk.csl;
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index e2b3c14..bfe0719 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -221,6 +221,22 @@ static inline struct pcs_cluster_core *cc_from_map(struct pcs_map_entry * m)
extern unsigned int cs_io_locality;
+void cslist_destroy(struct pcs_cs_list * csl);
+
+static inline void cslist_get(struct pcs_cs_list * csl)
+{
+ TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt));
+
+ atomic_inc(&csl->refcnt);
+}
+
+static inline void cslist_put(struct pcs_cs_list * csl)
+{
+ TRACE("csl:%p csl->map:%p refcnt:%d\n", csl, csl->map, atomic_read(&csl->refcnt));
+ if (atomic_dec_and_test(&csl->refcnt))
+ cslist_destroy(csl);
+}
+
#define MAP_FMT "(%p) 0x%lld s:%x" DENTRY_FMT
#define MAP_ARGS(m) (m), (long long)(m)->index, (m)->state, DENTRY_ARGS(pcs_dentry_from_map((m)))
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 68cf270..8ee32b3 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -59,6 +59,40 @@ struct pcs_aio_req
u32 crcb[PCS_MAX_INLINE_CRC];
};
+#define PCS_MAX_ACCEL_CS 3
+
+struct pcs_accel_write_req
+{
+ int index;
+ struct kiocb iocb;
+ atomic_t iocount;
+ struct work_struct work;
+
+ /* Crypto bits. This holds an encrypted copy of original data for use by aio writes */
+ struct bio_vec *bvec_copy;
+ unsigned num_copy_bvecs;
+};
+
+struct pcs_accel_req
+{
+ struct pcs_int_request *parent;
+ atomic_t iocount;
+ int num_awr;
+ struct pcs_accel_write_req awr[PCS_MAX_ACCEL_CS];
+ int num_iotimes;
+ struct pcs_cs_sync_resp io_times[PCS_MAX_ACCEL_CS];
+ struct work_struct work;
+};
+
+struct pcs_iochunk_req {
+ struct pcs_msg msg;
+ struct pcs_cs_iohdr hbuf; /* Buffer for header.
+ * A little ugly
+ */
+ struct kvec hbuf_kv;
+ struct pcs_int_request *parent_N;
+};
+
struct pcs_int_request
{
struct pcs_cluster_core* cc;
@@ -84,6 +118,7 @@ struct pcs_int_request
#define IREQ_F_NO_ACCEL 0x1000
#define IREQ_F_CRYPT 0x2000
#define IREQ_F_ACCELERROR 0x4000
+#define IREQ_F_NOACCT 0x8000
atomic_t iocount;
@@ -141,8 +176,11 @@ struct pcs_int_request
* A little ugly
*/
struct kvec hbuf_kv;
+ struct pcs_int_request *parent_N;
};
+ struct pcs_iochunk_req ir;
struct pcs_aio_req ar;
+ struct pcs_accel_req acr;
};
} iochunk;
--
1.8.3.1
More information about the Devel
mailing list