[Devel] [PATCH RHEL7 COMMIT] fuse kio_pcs: ports from new user-space
Konstantin Khorenko
khorenko at virtuozzo.com
Wed May 23 11:04:18 MSK 2018
The commit is pushed to "branch-rh7-3.10.0-693.21.1.vz7.50.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-693.21.1.vz7.50.1
------>
commit 785731c50a55a452f5ee0aecc0fa438c7edfba59
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date: Wed May 23 11:04:18 2018 +0300
fuse kio_pcs: ports from new user-space
New token-based congestion avoidance is ported to kernel.
Also, some bugs in fuse_kio_pcs congestion avoidance are fixed.
There was a race condition, when cwnd could go to 0 from another thread
and congestion queue would stuck. Now we solve the race condition moving
cwnd check under cs->lock. In case cs is not congested under cs->lock,
request is submitted immediately.
Signed-off-by: Alexey Kuznetsov <kuznet at virtuozzo.com>
Signed-off-by: Kirill Tkhai <ktkhai at virtuozzo.com>
---
fs/fuse/kio/pcs/pcs_cluster.c | 31 +++-
fs/fuse/kio/pcs/pcs_cs.c | 25 ++--
fs/fuse/kio/pcs/pcs_cs.h | 15 +-
fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 10 +-
fs/fuse/kio/pcs/pcs_map.c | 287 ++++++++++++++++++++++---------------
fs/fuse/kio/pcs/pcs_map.h | 7 +-
fs/fuse/kio/pcs/pcs_req.h | 15 ++
7 files changed, 241 insertions(+), 149 deletions(-)
diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c
index 79071655f6e5..24ec8a5f39a3 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.c
+++ b/fs/fuse/kio/pcs/pcs_cluster.c
@@ -18,6 +18,7 @@
#include "../../fuse_i.h"
void pcs_cc_process_ireq_chunk(struct pcs_int_request *ireq);
+static void ireq_process_(struct pcs_int_request *ireq);
static inline int is_file_inline(struct pcs_dentry_info *di)
{
@@ -226,6 +227,9 @@ static int fiemap_worker(void * arg)
sreq->dentry = di;
sreq->type = PCS_IREQ_IOCHUNK;
+ INIT_LIST_HEAD(&sreq->tok_list);
+ sreq->tok_reserved = 0;
+ sreq->tok_serno = 0;
sreq->iochunk.map = NULL;
sreq->iochunk.flow = pcs_flow_record(&di->mapping.ftab, 0, pos, end-pos, &di->cluster->maps.ftab);
sreq->iochunk.cmd = PCS_REQ_T_FIEMAP;
@@ -280,7 +284,7 @@ void pcs_cc_process_ireq_chunk(struct pcs_int_request *ireq)
pcs_map_put(ireq->iochunk.map);
ireq->iochunk.map = map;
- map_submit(map, ireq, 0);
+ map_submit(map, ireq);
}
/* TODO Remove noinline in production */
@@ -325,6 +329,9 @@ static noinline void __pcs_cc_process_ireq_rw(struct pcs_int_request *ireq)
sreq->dentry = di;
sreq->type = PCS_IREQ_IOCHUNK;
+ INIT_LIST_HEAD(&sreq->tok_list);
+ sreq->tok_reserved = 0;
+ sreq->tok_serno = 0;
sreq->iochunk.map = NULL;
sreq->iochunk.flow = pcs_flow_get(fl);
sreq->iochunk.cmd = ireq->apireq.req->type;
@@ -391,6 +398,25 @@ static void pcs_cc_process_ireq_ioreq(struct pcs_int_request *ireq)
return __pcs_cc_process_ireq_rw(ireq);
}
+static void process_ireq_token(struct pcs_int_request * ireq)
+{
+ struct pcs_int_request * parent = ireq->token.parent;
+
+ if (parent) {
+ int do_execute = 0;
+
+ spin_lock(&parent->completion_data.child_lock);
+ if (ireq->token.parent) {
+ ireq_drop_tokens(parent);
+ do_execute = 1;
+ }
+ spin_unlock(&parent->completion_data.child_lock);
+ if (do_execute)
+ ireq_process_(parent);
+ }
+ ireq_destroy(ireq);
+}
+
static void ireq_process_(struct pcs_int_request *ireq)
{
struct fuse_conn * fc = container_of(ireq->cc, struct pcs_fuse_cluster, cc)->fc;
@@ -425,6 +451,9 @@ static void ireq_process_(struct pcs_int_request *ireq)
case PCS_IREQ_CUSTOM:
ireq->custom.action(ireq);
break;
+ case PCS_IREQ_TOKEN:
+ process_ireq_token(ireq);
+ break;
default:
BUG();
break;
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 151b06cb18cf..cae591e65a42 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -694,8 +694,8 @@ void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err)
if (cs == NULL)
return;
+ list_splice_tail_init(&cs->active_list, &queue);
list_splice_tail_init(&cs->cong_queue, &queue);
- clear_bit(CS_SF_CONGESTED, &cs->state);
cs->cong_queue_len = 0;
cs_blacklist(cs, err->value, "notify error");
spin_unlock(&cs->lock);
@@ -710,9 +710,7 @@ static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose)
list_splice_tail_init(&cs->active_list, dispose);
list_splice_tail_init(&cs->cong_queue, dispose);
- cs->active_list_len = 0;
cs->cong_queue_len = 0;
- clear_bit(CS_SF_CONGESTED, &cs->state);
cs->is_dead = 1;
spin_lock(&cs->css->lock);
@@ -920,7 +918,7 @@ void cs_decrement_in_flight(struct pcs_cs *cs, unsigned int to_dec)
if (cs->in_flight < cs->eff_cwnd) {
cs->cwr_state = 0;
- pcs_cs_flush_cong_queue(cs);
+ pcs_cs_activate_cong_queue(cs);
}
if (cs->in_flight == 0)
cs->idle_stamp = jiffies;
@@ -1066,6 +1064,7 @@ void pcs_csset_init(struct pcs_cs_set *css)
INIT_DELAYED_WORK(&css->bl_work, bl_timer_work);
css->ncs = 0;
spin_lock_init(&css->lock);
+ atomic64_set(&css->csl_serno_gen, 0);
}
void pcs_csset_fini(struct pcs_cs_set *css)
@@ -1146,14 +1145,18 @@ void pcs_cs_set_stat_up(struct pcs_cs_set *set)
pcs_cs_for_each_entry(set, do_update_stat, 0);
}
-void pcs_cs_cong_enqueue(struct pcs_int_request *ireq, struct pcs_cs *cs)
+int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs)
{
+ int queued = 0;
+
spin_lock(&cs->lock);
- if (test_bit(CS_SF_CONGESTED, &cs->state))
- test_bit(CS_SF_CONGESTED, &cs->state);
- list_add_tail(&ireq->list, &cs->cong_queue);
- cs->cong_queue_len++;
- if (!ireq->qdepth)
- ireq->qdepth = cs->cong_queue_len + cs->active_list_len;
+ if (cs->in_flight >= cs->eff_cwnd) {
+ list_add_tail(&ireq->list, &cs->cong_queue);
+ cs->cong_queue_len++;
+ if (!ireq->qdepth)
+ ireq->qdepth = cs->cong_queue_len;
+ queued = 1;
+ }
spin_unlock(&cs->lock);
+ return queued;
}
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index f46b31f2633d..eb81ac51f3ae 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -36,7 +36,6 @@ enum {
CS_SF_FAILED,
CS_SF_BLACKLISTED,
CS_SF_ACTIVE,
- CS_SF_CONGESTED,
};
struct pcs_cs {
@@ -66,14 +65,13 @@ struct pcs_cs {
struct list_head cong_queue;
int cong_queue_len;
struct list_head active_list;
- int active_list_len;
pcs_cs_io_prio_t io_prio;
pcs_cs_net_prio_t net_prio;
u8 mds_flags;
abs_time_t io_prio_stamp;
- struct list_head flow_lru;
+ struct list_head flow_lru;
int nflows;
unsigned long state;
@@ -104,24 +102,20 @@ static inline void pcs_cs_init_cong_queue(struct pcs_cs *cs)
{
INIT_LIST_HEAD(&cs->cong_queue);
cs->cong_queue_len = 0;
- clear_bit(CS_SF_CONGESTED, &cs->state);
}
static inline void pcs_cs_init_active_list(struct pcs_cs *cs)
{
INIT_LIST_HEAD(&cs->active_list);
- cs->active_list_len = 0;
}
-static inline void pcs_cs_flush_cong_queue(struct pcs_cs *cs)
+static inline void pcs_cs_activate_cong_queue(struct pcs_cs *cs)
{
assert_spin_locked(&cs->lock);
- list_splice_tail(&cs->cong_queue, &cs->active_list);
- cs->active_list_len += cs->cong_queue_len;
- pcs_cs_init_cong_queue(cs);
+ list_splice_tail_init(&cs->cong_queue, &cs->active_list);
}
-void pcs_cs_cong_enqueue(struct pcs_int_request *ireq, struct pcs_cs *cs);
+int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs);
#define PCS_CS_HASH_SIZE 1024
@@ -132,6 +126,7 @@ struct pcs_cs_set {
struct delayed_work bl_work;
unsigned int ncs;
spinlock_t lock;
+ atomic64_t csl_serno_gen;
};
void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq);
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index f4c0fb15403e..61378f0d9a58 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -843,11 +843,6 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req,
if (inarg->offset >= di->fileinfo.attr.size)
inarg->mode &= ~FALLOC_FL_ZERO_RANGE;
- if (inarg->mode & FALLOC_FL_KEEP_SIZE) {
- if (inarg->offset + inarg->length > di->fileinfo.attr.size)
- inarg->length = di->fileinfo.attr.size - inarg->offset;
- }
-
if (inarg->mode & (FALLOC_FL_ZERO_RANGE|FALLOC_FL_PUNCH_HOLE)) {
if ((inarg->offset & (PAGE_SIZE - 1)) || (inarg->length & (PAGE_SIZE - 1))) {
r->req.out.h.error = -EINVAL;
@@ -855,6 +850,11 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req *req,
}
}
+ if (inarg->mode & FALLOC_FL_KEEP_SIZE) {
+ if (inarg->offset + inarg->length > di->fileinfo.attr.size)
+ inarg->length = di->fileinfo.attr.size - inarg->offset;
+ }
+
ret = pcs_fuse_prep_rw(r);
if (!ret)
goto submit;
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 085f49fd0219..90e311f7e995 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -911,8 +911,8 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re
atomic_set(&cs_list->refcnt, 1);
atomic_set(&cs_list->seq_read_in_flight, 0);
cs_list->read_index = -1;
- cs_list->cong_index = -1 ;
cs_list->flags = 0;
+ cs_list->serno = atomic64_inc_return(&css->csl_serno_gen);
cs_list->blacklist = 0;
cs_list->read_timeout = (read_tout * HZ) / 1000;
cs_list->write_timeout = (write_tout * HZ) / 1000;
@@ -1380,7 +1380,7 @@ static void pcs_cs_deaccount(struct pcs_int_request *ireq, struct pcs_cs * cs, i
spin_unlock(&cs->lock);
}
-static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
+static void pcs_cs_wakeup(struct pcs_cs * cs)
{
struct pcs_int_request * sreq;
struct pcs_map_entry * map;
@@ -1393,11 +1393,32 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
break;
}
sreq = list_first_entry(&cs->active_list, struct pcs_int_request, list);
- BUG_ON(!cs->active_list_len);
list_del_init(&sreq->list);
- cs->active_list_len--;
+ cs->cong_queue_len--;
spin_unlock(&cs->lock);
+ if (sreq->type == PCS_IREQ_TOKEN) {
+ struct pcs_int_request * parent = sreq->token.parent;
+ int do_execute = 0;
+
+ if (parent == NULL) {
+ ireq_destroy(sreq);
+ continue;
+ }
+
+ spin_lock(&parent->completion_data.child_lock);
+ if (sreq->token.parent) {
+ parent->tok_reserved |= (1ULL << sreq->token.cs_index);
+ list_del(&sreq->token.tok_link);
+ do_execute = list_empty(&parent->tok_list);
+ }
+ spin_unlock(&parent->completion_data.child_lock);
+ ireq_destroy(sreq);
+ if (!do_execute)
+ continue;
+ sreq = parent;
+ }
+
if (sreq->type != PCS_IREQ_FLUSH) {
map = pcs_find_get_map(sreq->dentry, sreq->iochunk.chunk +
((sreq->flags & IREQ_F_MAPPED) ? 0 : sreq->iochunk.offset));
@@ -1412,7 +1433,7 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
preq->apireq.req->pos, preq->apireq.req->size,
&sreq->cc->maps.ftab);
}
- map_submit(map, sreq, requeue);
+ map_submit(map, sreq);
} else {
map_queue_on_limit(sreq);
}
@@ -1422,58 +1443,33 @@ static void pcs_cs_wakeup(struct pcs_cs * cs, int requeue)
pcs_clear_error(&sreq->error);
ireq_complete(sreq);
} else
- map_submit(map, sreq, requeue);
+ map_submit(map, sreq);
}
}
}
static int __pcs_cs_still_congested(struct pcs_cs * cs)
{
+ if (!list_empty(&cs->active_list))
+ list_splice_tail_init(&cs->active_list, &cs->cong_queue);
- assert_spin_locked(&cs->lock);
-
- if (!list_empty(&cs->active_list)) {
- BUG_ON(!cs->active_list_len);
- list_splice_tail(&cs->active_list, &cs->cong_queue);
- cs->cong_queue_len += cs->active_list_len;
- set_bit(CS_SF_CONGESTED, &cs->state);
- pcs_cs_init_active_list(cs);
- } else if (list_empty(&cs->cong_queue)) {
+ if (list_empty(&cs->cong_queue)) {
BUG_ON(cs->cong_queue_len);
- BUG_ON(test_bit(CS_SF_CONGESTED, &cs->state));
return 0;
- } else {
- BUG_ON(cs->active_list_len);
}
- if (cs->in_flight >= cs->eff_cwnd)
- return 0;
-
- /* Exceptional situation: CS is not congested, but still has congestion queue.
- * This can happen f.e. when CS was congested with reads and has some writes in queue,
- * then all reads are complete, but writes cannot be sent because of congestion
- * on another CSes in chain. This is absolutely normal, we just should queue
- * not on this CS, but on actualle congested CSes. With current algorithm of preventing
- * reordering, we did a mistake and queued on node which used to be congested.
- * Solution for now is to retry sending with flag "requeue" set, it will requeue
- * requests on another nodes. It is difficult to say how frequently this happens,
- * so we spit out message. If we will have lots of them in logs, we have to select
- * different solution.
- */
-
- TRACE("CS#" NODE_FMT " is free, but still has queue", NODE_ARGS(cs->id));
- pcs_cs_flush_cong_queue(cs);
-
- return 1;
+ return cs->in_flight < cs->eff_cwnd;
}
+
static int pcs_cs_still_congested(struct pcs_cs * cs)
{
- int ret;
+ int res;
spin_lock(&cs->lock);
- ret = __pcs_cs_still_congested(cs);
+ res = __pcs_cs_still_congested(cs);
spin_unlock(&cs->lock);
- return ret;
+
+ return res;
}
void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
@@ -1518,7 +1514,7 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
if (ireq->type == PCS_IREQ_FLUSH || (pcs_req_direction(ireq->iochunk.cmd) && !(ireq->flags & IREQ_F_MAPPED))) {
int i;
- int requeue = 0;
+ int requeue;
for (i = csl->nsrv - 1; i >= 0; i--) {
if (!match_id || csl->cs[i].cslink.cs->id.val == match_id)
@@ -1538,14 +1534,14 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
do {
for (i = csl->nsrv - 1; i >= 0; i--)
- pcs_cs_wakeup(csl->cs[i].cslink.cs, requeue);
+ pcs_cs_wakeup(csl->cs[i].cslink.cs);
requeue = 0;
for (i = csl->nsrv - 1; i >= 0; i--)
- requeue += pcs_cs_still_congested(csl->cs[i].cslink.cs);
+ requeue |= pcs_cs_still_congested(csl->cs[i].cslink.cs);
} while (requeue);
} else {
- int requeue = 0;
+ int requeue;
struct pcs_cs * rcs = csl->cs[ireq->iochunk.cs_index].cslink.cs;
if (ireq->flags & IREQ_F_SEQ_READ) {
@@ -1557,7 +1553,7 @@ void pcs_deaccount_ireq(struct pcs_int_request *ireq, pcs_error_t * err)
pcs_cs_deaccount(ireq, rcs, error);
do {
- pcs_cs_wakeup(rcs, requeue);
+ pcs_cs_wakeup(rcs);
requeue = pcs_cs_still_congested(rcs);
} while (requeue);
@@ -1771,6 +1767,10 @@ pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign)
sreq->iochunk.map = ireq->iochunk.map;
if (sreq->iochunk.map)
__pcs_map_get(sreq->iochunk.map);
+ INIT_LIST_HEAD(&sreq->tok_list);
+ BUG_ON(!list_empty(&ireq->tok_list));
+ sreq->tok_reserved = ireq->tok_reserved;
+ sreq->tok_serno = ireq->tok_serno;
sreq->iochunk.flow = pcs_flow_get(ireq->iochunk.flow);
sreq->iochunk.cmd = ireq->iochunk.cmd;
sreq->iochunk.role = ireq->iochunk.role;
@@ -1803,7 +1803,7 @@ pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign)
return sreq;
}
-static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue)
+static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_list * csl)
{
struct pcs_cluster_core *cc = ireq->cc;
struct pcs_cs * cs;
@@ -1872,9 +1872,8 @@ static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_li
spin_unlock(&cs->lock);
if (allot < 0) {
- pcs_cs_cong_enqueue(ireq, cs);
-
- return 0;
+ if (pcs_cs_cong_enqueue_cond(ireq, cs))
+ return 0;
}
if (allot < ireq->dentry->cluster->cfg.curr.lmss)
@@ -1924,77 +1923,131 @@ static int pcs_cslist_submit_read(struct pcs_int_request *ireq, struct pcs_cs_li
return 0;
if (allot < 0) {
- pcs_cs_cong_enqueue(ireq, cs);
- return 0;
+ if (pcs_cs_cong_enqueue_cond(ireq, cs))
+ return 0;
}
}
}
-static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue)
+static int ireq_queue_tokens(struct pcs_int_request * ireq, struct pcs_cs_list * csl)
+{
+ int i;
+ int queued = 0;
+ struct list_head drop;
+ struct pcs_int_request * toks[csl->nsrv];
+
+ INIT_LIST_HEAD(&drop);
+
+ for (i = 0; i < csl->nsrv; i++) {
+ struct pcs_int_request * ntok;
+
+ /* ireq is private; no need to lock tok_* fields */
+
+ if (ireq->tok_reserved & (1ULL << i)) {
+ toks[i] = NULL;
+ continue;
+ }
+
+ ntok = ireq_alloc(ireq->dentry);
+ BUG_ON(!ntok);
+ ntok->type = PCS_IREQ_TOKEN;
+ ntok->token.parent = ireq;
+ ntok->token.cs_index = i;
+ toks[i] = ntok;
+ }
+
+ /* Publish tokens in CS queues */
+ spin_lock(&ireq->completion_data.child_lock);
+ for (i = 0; i < csl->nsrv; i++) {
+ if (toks[i]) {
+ struct pcs_cs * cs = csl->cs[i].cslink.cs;
+ if (pcs_cs_cong_enqueue_cond(toks[i], cs)) {
+ list_add(&toks[i]->token.tok_link, &ireq->tok_list);
+ toks[i] = NULL;
+ queued = 1;
+ } else {
+ list_add(&toks[i]->token.tok_link, &drop);
+ }
+ }
+ }
+ spin_unlock(&ireq->completion_data.child_lock);
+
+ while (!list_empty(&drop)) {
+ struct pcs_int_request * tok = list_first_entry(&drop, struct pcs_int_request, token.tok_link);
+ list_del(&tok->token.tok_link);
+ ireq_destroy(tok);
+ }
+ return queued;
+}
+
+void ireq_drop_tokens(struct pcs_int_request * ireq)
+{
+ assert_spin_locked(&ireq->completion_data.child_lock);
+
+ while (!list_empty(&ireq->tok_list)) {
+ struct pcs_int_request * tok = list_first_entry(&ireq->tok_list, struct pcs_int_request, token.tok_link);
+ tok->token.parent = NULL;
+ list_del(&tok->token.tok_link);
+ }
+}
+
+static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_list * csl)
{
struct pcs_cs * cs;
unsigned int iochunk;
int i;
- int congested_idx;
- int max_excess;
int allot;
+ struct pcs_cs * congested_cs = NULL;
+ u64 congested = 0;
ireq->iochunk.cs_index = 0;
iochunk = ireq->dentry->cluster->cfg.curr.lmss;
restart:
- congested_idx = -1;
- max_excess = 0;
allot = ireq->iochunk.size;
+ if (csl->serno != ireq->tok_serno)
+ ireq->tok_reserved = 0;
+ BUG_ON(!list_empty(&ireq->tok_list));
for (i = 0; i < csl->nsrv; i++) {
- int cs_allot;
-
cs = csl->cs[i].cslink.cs;
if (cs_is_blacklisted(cs)) {
map_remote_error(ireq->iochunk.map, cs->blacklist_reason, cs->id.val);
TRACE("Write to " MAP_FMT " blocked by blacklist error %d, CS" NODE_FMT,
MAP_ARGS(ireq->iochunk.map), cs->blacklist_reason, NODE_ARGS(cs->id));
+ spin_lock(&ireq->completion_data.child_lock);
+ ireq_drop_tokens(ireq);
+ spin_unlock(&ireq->completion_data.child_lock);
return -1;
}
spin_lock(&cs->lock);
cs_cwnd_use_or_lose(cs);
- cs_allot = cs->eff_cwnd - cs->in_flight;
spin_unlock(&cs->lock);
- if (cs_allot < 0) {
- cs_allot = -cs_allot;
- if (cs_allot > max_excess) {
- congested_idx = i;
- max_excess = cs_allot;
- }
- } else {
- if (cs_allot < allot)
- allot = cs_allot;
- }
+ if (cs->in_flight > cs->eff_cwnd && !(ireq->tok_reserved & (1ULL << i))) {
+ congested_cs = cs;
+ congested |= (1ULL << i);
+ } else
+ ireq->tok_reserved |= (1ULL << i);
if (!(test_bit(CS_SF_LOCAL, &cs->state)))
iochunk = ireq->dentry->cluster->cfg.curr.wmss;
}
- if (congested_idx >= 0) {
- int cur_cong_idx = READ_ONCE(csl->cong_index);
+ if (allot < ireq->dentry->cluster->cfg.curr.lmss)
+ allot = ireq->dentry->cluster->cfg.curr.lmss;
+ if (congested) {
+ int queued;
- if (cur_cong_idx >= 0 && !requeue &&
- (READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->cong_queue_len) ||
- READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->active_list_len)))
- congested_idx = cur_cong_idx;
+ ireq->tok_serno = csl->serno;
+ if (congested & (congested - 1))
+ queued = ireq_queue_tokens(ireq, csl);
else
- WRITE_ONCE(csl->cong_index, congested_idx);
-
- pcs_cs_cong_enqueue(ireq, csl->cs[congested_idx].cslink.cs);
- return 0;
+ queued = pcs_cs_cong_enqueue_cond(ireq, congested_cs);
+ if (queued)
+ return 0;
}
- WRITE_ONCE(csl->cong_index, -1);
-
- if (allot < ireq->dentry->cluster->cfg.curr.lmss)
- allot = ireq->dentry->cluster->cfg.curr.lmss;
for (;;) {
struct pcs_int_request * sreq = ireq;
@@ -2048,61 +2101,55 @@ static int pcs_cslist_submit_write(struct pcs_int_request *ireq, struct pcs_cs_l
}
}
-static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_list * csl, int requeue)
+static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_list * csl)
{
struct pcs_cs * cs;
int i;
- int congested_idx;
- int max_excess;
int allot = PCS_CS_FLUSH_WEIGHT;
struct pcs_msg * msg;
struct pcs_cs_iohdr * ioh;
+ u64 congested = 0;
+ struct pcs_cs * congested_cs = NULL;
- congested_idx = -1;
- max_excess = 0;
+ if (csl->serno != ireq->tok_serno)
+ ireq->tok_reserved = 0;
+ BUG_ON(!list_empty(&ireq->tok_list));
for (i = 0; i < csl->nsrv; i++) {
- int cs_allot;
-
cs = csl->cs[i].cslink.cs;
if (cs_is_blacklisted(cs)) {
map_remote_error(ireq->flushreq.map, cs->blacklist_reason, cs->id.val);
TRACE("Flush to " MAP_FMT " blocked by blacklist error %d, CS" NODE_FMT,
MAP_ARGS(ireq->flushreq.map), cs->blacklist_reason, NODE_ARGS(cs->id));
+ spin_lock(&ireq->completion_data.child_lock);
+ ireq_drop_tokens(ireq);
+ spin_unlock(&ireq->completion_data.child_lock);
return -1;
}
spin_lock(&cs->lock);
cs_cwnd_use_or_lose(cs);
- cs_allot = cs->eff_cwnd - cs->in_flight;
spin_unlock(&cs->lock);
-
- if (cs_allot < 0) {
- cs_allot = -cs_allot;
- if (cs_allot > max_excess) {
- congested_idx = i;
- max_excess = cs_allot;
- }
- }
+ if (cs->in_flight > cs->eff_cwnd && !(ireq->tok_reserved & (1ULL << i))) {
+ congested_cs = cs;
+ congested |= (1ULL << i);
+ } else
+ ireq->tok_reserved |= (1ULL << i);
}
- if (congested_idx >= 0) {
- int cur_cong_idx = READ_ONCE(csl->cong_index);
+ if (congested) {
+ int queued;
- if (cur_cong_idx >= 0 && !requeue &&
- (READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->cong_queue_len) ||
- READ_ONCE(csl->cs[cur_cong_idx].cslink.cs->active_list_len)))
- congested_idx = cur_cong_idx;
+ ireq->tok_serno = csl->serno;
+ if (congested & (congested - 1))
+ queued = ireq_queue_tokens(ireq, csl);
else
- WRITE_ONCE(csl->cong_index, congested_idx);
-
- pcs_cs_cong_enqueue(ireq, csl->cs[congested_idx].cslink.cs);
- return 0;
+ queued = pcs_cs_cong_enqueue_cond(ireq, congested_cs);
+ if (queued)
+ return 0;
}
- WRITE_ONCE(csl->cong_index, -1);
-
for (i = 0; i < csl->nsrv; i++) {
cs = csl->cs[i].cslink.cs;
cs_increment_in_flight(cs, allot);
@@ -2137,25 +2184,25 @@ static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_l
-int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl, int requeue)
+int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl)
{
BUG_ON(!atomic_read(&csl->refcnt));
if (ireq->type == PCS_IREQ_FLUSH) {
- return pcs_cslist_submit_flush(ireq, csl, requeue);
+ return pcs_cslist_submit_flush(ireq, csl);
} else if (!pcs_req_direction(ireq->iochunk.cmd)) {
- return pcs_cslist_submit_read(ireq, csl, requeue);
+ return pcs_cslist_submit_read(ireq, csl);
} else if (ireq->flags & IREQ_F_MAPPED) {
BUG();
return -EIO;
} else {
- return pcs_cslist_submit_write(ireq, csl, requeue);
+ return pcs_cslist_submit_write(ireq, csl);
}
BUG();
return -EIO;
}
-void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requeue)
+void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq)
{
int direction;
int done;
@@ -2235,7 +2282,7 @@ void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requ
if (direction && ireq->type != PCS_IREQ_FLUSH)
ireq->dentry->local_mtime = get_real_time_ms();
- done = !pcs_cslist_submit(ireq, csl, requeue);
+ done = !pcs_cslist_submit(ireq, csl);
cslist_put(csl);
} while (!done);
}
@@ -2690,7 +2737,7 @@ void process_flush_req(struct pcs_int_request *ireq)
goto done;
}
spin_unlock(&m->lock);
- map_submit(m, ireq, 0);
+ map_submit(m, ireq);
return;
done:
@@ -2888,6 +2935,8 @@ static int prepare_map_flush_ireq(struct pcs_map_entry *m, struct pcs_int_reques
}
prepare_map_flush_msg(m, sreq, msg);
sreq->type = PCS_IREQ_FLUSH;
+ INIT_LIST_HEAD(&sreq->tok_list);
+ sreq->tok_reserved = 0;
sreq->ts = ktime_get();
sreq->completion_data.parent = NULL;
sreq->flushreq.map = m;
@@ -2922,7 +2971,7 @@ static void sync_timer_work(struct work_struct *w)
map_sync_work_add(m, HZ);
} else {
if (sreq)
- map_submit(m, sreq, 0);
+ map_submit(m, sreq);
}
/* Counter part from map_sync_work_add */
pcs_map_put(m);
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index 11176b2b80d5..c60e72b365d3 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -85,13 +85,13 @@ struct pcs_cs_list
atomic_t refcnt;
atomic_t seq_read_in_flight;
int read_index; /* volatile read hint */
- int cong_index; /* volatile cong hint */
unsigned long blacklist; /* Atomic bit field */
abs_time_t blacklist_expires; /* volatile blacklist stamp */
abs_time_t select_stamp; /* volatile read hint stamp */
/* members below are immutable accross cslist life time */
#define CSL_FL_HAS_LOCAL 1
unsigned int flags;
+ u64 serno;
int read_timeout;
int write_timeout;
int nsrv;
@@ -165,7 +165,7 @@ void pcs_mapping_truncate(struct pcs_int_request *ireq, u64 old_size);
void process_ireq_truncate(struct pcs_int_request *ireq);
struct pcs_map_entry * pcs_find_get_map(struct pcs_dentry_info * de, u64 chunk);
-void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq, int requeue);
+void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq);
void map_notify_iochunk_error(struct pcs_int_request *ireq);
void map_notify_soft_error(struct pcs_int_request *ireq);
void __pcs_map_put(struct pcs_map_entry *m);
@@ -182,7 +182,7 @@ void pcs_map_verify_sync_state(struct pcs_dentry_info * de, struct pcs_int_reque
void map_inject_flush_req(struct pcs_int_request *ireq);
void process_flush_req(struct pcs_int_request *ireq);
int map_check_limit(struct pcs_map_entry * map, struct pcs_int_request *ireq);
-int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl, int requeue);
+int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl);
struct pcs_int_request * pcs_ireq_split(struct pcs_int_request *ireq, unsigned int iochunk, int noalign);
int fuse_map_resolve(struct pcs_map_entry * m, int direction);
struct pcs_ioc_getmap;
@@ -190,6 +190,7 @@ void pcs_map_complete(struct pcs_map_entry *m, struct pcs_ioc_getmap *omap);
int pcs_map_encode_req(struct pcs_map_entry*m, struct pcs_ioc_getmap *map, int direction);
void map_truncate_tail(struct pcs_mapping *mapping, u64 offset);
unsigned long pcs_map_shrink_scan(struct shrinker *, struct shrink_control *sc);
+void ireq_drop_tokens(struct pcs_int_request * ireq);
#define MAP_FMT "(%p) 0x%lld s:%x" DENTRY_FMT
#define MAP_ARGS(m) (m), (long long)(m)->index, (m)->state, DENTRY_ARGS(pcs_dentry_from_map((m)))
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index fba74e9c4a56..6f49018e3988 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -28,6 +28,8 @@ enum
PCS_IREQ_CUSTOM = 16, /* generic request */
PCS_IREQ_WRAID = 17, /* compound raid6 write request */
PCS_IREQ_RRAID = 18, /* compound raid6 read request */
+ PCS_IREQ_GETMAP = 19, /* get mapping for kdirect mode */
+ PCS_IREQ_TOKEN = 20, /* dummy token to allocate congestion window */
PCS_IREQ_KAPI = 65 /* IO request from kernel API */
};
@@ -86,6 +88,13 @@ struct pcs_int_request
*/
struct work_struct worker;
+ /* The following tok_* fields are sequenced by completion_data.child_lock
+ * NOTE: cs->lock can be taken under this lock.
+ */
+ struct list_head tok_list;
+ u64 tok_reserved;
+ u64 tok_serno;
+
union {
struct {
struct pcs_map_entry *map;
@@ -112,6 +121,12 @@ struct pcs_int_request
struct pcs_msg *msg;
} flushreq;
+ struct {
+ struct pcs_int_request *parent;
+ struct list_head tok_link;
+ int cs_index;
+ } token;
+
struct {
u64 offset;
int phase;
More information about the Devel
mailing list