[Devel] [PATCH vz9 v1 38/63] dm-ploop: introduce pio runner threads
Alexander Atanasov
alexander.atanasov at virtuozzo.com
Fri Jan 24 18:36:12 MSK 2025
Create threads to execute pios in parallel - call them pio runners.
Use number of CPUs to determine the number of threads started.
>From worker each pio is sent to a thread in round-robin fashion
thru work_llist. Maintain the number of pios sent so we can wait
for them to be processed - NB we only want to keep the order of
execution of different pio types which can be different than
the order of their completion. We send a batch of pios to the runners
and if necessary we wait for them to be processed before moving
forwards - we need this for metadata writeback and flushes.
https://virtuozzo.atlassian.net/browse/VSTOR-91821
Signed-off-by: Alexander Atanasov <alexander.atanasov at virtuozzo.com>
---
drivers/md/dm-ploop-map.c | 136 ++++++++++++++++++++++++++++++++---
drivers/md/dm-ploop-target.c | 44 ++++++++++--
drivers/md/dm-ploop.h | 13 +++-
3 files changed, 175 insertions(+), 18 deletions(-)
diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index f59889d2cd89..b820e9ba1218 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -20,6 +20,8 @@
#include "dm-ploop.h"
#include "dm-rq.h"
+static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio);
+
#define PREALLOC_SIZE (128ULL * 1024 * 1024)
static void ploop_handle_cleanup(struct ploop *ploop, struct pio *pio);
@@ -1894,6 +1896,11 @@ static void ploop_process_resubmit_pios(struct ploop *ploop,
}
}
+static inline int ploop_runners_have_pending(struct ploop *ploop)
+{
+ return atomic_read(&ploop->kt_worker->inflight_pios);
+}
+
static int ploop_submit_metadata_writeback(struct ploop *ploop)
{
unsigned long flags;
@@ -1958,6 +1965,33 @@ static void process_ploop_fsync_work(struct ploop *ploop, struct llist_node *llf
}
}
+static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio)
+{
+ struct ploop_worker *wrkr;
+
+ wrkr = READ_ONCE(ploop->last_used_runner)->next;
+ WRITE_ONCE(ploop->last_used_runner, wrkr);
+
+ atomic_inc(&ploop->kt_worker->inflight_pios);
+ llist_add((struct llist_node *)(&pio->list), &wrkr->work_llist);
+ wake_up_process(wrkr->task);
+
+ return 0;
+}
+
+static inline int ploop_runners_add_work_list(struct ploop *ploop, struct llist_node *list)
+{
+ struct llist_node *pos, *t;
+ struct pio *pio;
+
+ llist_for_each_safe(pos, t, list) {
+ pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+ ploop_runners_add_work(ploop, pio);
+ }
+
+ return 0;
+}
+
void do_ploop_run_work(struct ploop *ploop)
{
LLIST_HEAD(deferred_pios);
@@ -2017,30 +2051,110 @@ void do_ploop_work(struct work_struct *ws)
do_ploop_run_work(ploop);
}
-int ploop_worker(void *data)
+int ploop_pio_runner(void *data)
{
struct ploop_worker *worker = data;
struct ploop *ploop = worker->ploop;
+ struct llist_node *llwork;
+ struct pio *pio;
+ struct llist_node *pos, *t;
+ unsigned int old_flags = current->flags;
+ int did_process_pios = 0;
for (;;) {
+ current->flags = old_flags;
set_current_state(TASK_INTERRUPTIBLE);
- if (kthread_should_stop()) {
- __set_current_state(TASK_RUNNING);
- break;
+check_for_more:
+ llwork = llist_del_all(&worker->work_llist);
+ if (!llwork) {
+ if (did_process_pios) {
+ did_process_pios = 0;
+ wake_up_interruptible(&ploop->dispatcher_wq_data);
+ }
+ /* Only stop when there is no more pios */
+ if (kthread_should_stop()) {
+ __set_current_state(TASK_RUNNING);
+ break;
+ }
+ schedule();
+ continue;
}
+ __set_current_state(TASK_RUNNING);
+ old_flags = current->flags;
+ current->flags |= PF_IO_THREAD|PF_LOCAL_THROTTLE|PF_MEMALLOC_NOIO;
+
+ llist_for_each_safe(pos, t, llwork) {
+ pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+ INIT_LIST_HEAD(&pio->list);
+ switch (pio->queue_list_id) {
+ case PLOOP_LIST_FLUSH:
+ WARN_ON_ONCE(1); /* We must not see flushes here */
+ break;
+ case PLOOP_LIST_PREPARE:
+ // fsync pios can come here for endio
+ // XXX: make it a FSYNC list
+ ploop_pio_endio(pio);
+ break;
+ case PLOOP_LIST_DEFERRED:
+ ploop_process_one_deferred_bio(ploop, pio);
+ break;
+ case PLOOP_LIST_COW:
+ ploop_process_one_delta_cow(ploop, pio);
+ break;
+ case PLOOP_LIST_DISCARD:
+ ploop_process_one_discard_pio(ploop, pio);
+ break;
+ // XXX: make it list MDWB
+ case PLOOP_LIST_INVALID: /* resubmit sets the list id to invalid */
+ ploop_submit_rw_mapped(ploop, pio);
+ break;
+ default:
+ WARN_ON_ONCE(1);
+ }
+ atomic_dec(&ploop->kt_worker->inflight_pios);
+ }
+ cond_resched();
+ did_process_pios = 1;
+ goto check_for_more;
+ }
+ return 0;
+}
+
+int ploop_worker(void *data)
+{
+ struct ploop_worker *worker = data;
+ struct ploop *ploop = worker->ploop;
+
+ for (;;) {
+ set_current_state(TASK_INTERRUPTIBLE);
+
if (llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) &&
- llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) &&
- llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) &&
- llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) &&
- llist_empty(&ploop->pios[PLOOP_LIST_COW]) &&
- llist_empty(&ploop->llresubmit_pios)
- )
+ llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) &&
+ llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) &&
+ llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) &&
+ llist_empty(&ploop->pios[PLOOP_LIST_COW]) &&
+ llist_empty(&ploop->llresubmit_pios) &&
+ !ploop->force_md_writeback) {
+ if (kthread_should_stop()) {
+ wait_event_interruptible(ploop->dispatcher_wq_data,
+ (!ploop_runners_have_pending(ploop)));
+ __set_current_state(TASK_RUNNING);
+ break;
+ }
schedule();
+ /* now check for pending work */
+ }
__set_current_state(TASK_RUNNING);
do_ploop_run_work(ploop);
- cond_resched();
+ cond_resched(); /* give other processes chance to run */
+ if (kthread_should_stop()) {
+ wait_event_interruptible(ploop->dispatcher_wq_data,
+ (!ploop_runners_have_pending(ploop)));
+ __set_current_state(TASK_RUNNING);
+ break;
+ }
}
return 0;
}
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
index dc63c18cece8..3fed26137831 100644
--- a/drivers/md/dm-ploop-target.c
+++ b/drivers/md/dm-ploop-target.c
@@ -164,6 +164,7 @@ static void ploop_destroy(struct ploop *ploop)
int i;
if (ploop->kt_worker) {
+ ploop->force_md_writeback = 1;
wake_up_process(ploop->kt_worker->task);
/* try to send all pending - if we have partial io and enospc end bellow */
while (!llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) ||
@@ -175,9 +176,22 @@ static void ploop_destroy(struct ploop *ploop)
schedule();
}
+ if (ploop->kt_runners) {
+ for (i = 0; i < ploop->nkt_runners; i++) {
+ if (ploop->kt_runners[i]) {
+ wake_up_process(ploop->kt_runners[i]->task);
+ kthread_stop(ploop->kt_runners[i]->task);
+ kfree(ploop->kt_runners[i]);
+ }
+ }
+ }
+
kthread_stop(ploop->kt_worker->task); /* waits for the thread to stop */
+
WARN_ON(!llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]));
WARN_ON(!llist_empty(&ploop->llresubmit_pios));
+ WARN_ON(!llist_empty(&ploop->enospc_pios));
+ kfree(ploop->kt_runners);
kfree(ploop->kt_worker);
}
@@ -347,7 +361,8 @@ ALLOW_ERROR_INJECTION(ploop_add_deltas_stack, ERRNO);
argv++; \
} while (0);
-static struct ploop_worker *ploop_worker_create(struct ploop *ploop)
+static struct ploop_worker *ploop_worker_create(struct ploop *ploop,
+ int (*worker_fn)(void *), const char *pref, int id)
{
struct ploop_worker *worker;
struct task_struct *task;
@@ -357,12 +372,13 @@ static struct ploop_worker *ploop_worker_create(struct ploop *ploop)
return NULL;
worker->ploop = ploop;
- task = kthread_create(ploop_worker, worker, "ploop-%d-0",
- current->pid);
+ task = kthread_create(worker_fn, worker, "ploop-%d-%s-%d",
+ current->pid, pref, id);
if (IS_ERR(task))
goto out_err;
worker->task = task;
+ init_llist_head(&worker->work_llist);
wake_up_process(task);
@@ -521,10 +537,30 @@ static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
goto err;
- ploop->kt_worker = ploop_worker_create(ploop);
+ init_waitqueue_head(&ploop->dispatcher_wq_data);
+
+ ploop->kt_worker = ploop_worker_create(ploop, ploop_worker, "d", 0);
if (!ploop->kt_worker)
goto err;
+/* make it a param = either module or cpu based or dev req queue */
+#define PLOOP_PIO_RUNNERS nr_cpu_ids
+ ploop->kt_runners = kcalloc(PLOOP_PIO_RUNNERS, sizeof(struct kt_worker *), GFP_KERNEL);
+ if (!ploop->kt_runners)
+ goto err;
+
+ ploop->nkt_runners = PLOOP_PIO_RUNNERS;
+ for (i = 0; i < ploop->nkt_runners; i++) {
+ ploop->kt_runners[i] = ploop_worker_create(ploop, ploop_pio_runner, "r", i+1);
+ if (!ploop->kt_runners[i])
+ goto err;
+ }
+
+ for (i = 0; i < ploop->nkt_runners-1; i++)
+ ploop->kt_runners[i]->next = ploop->kt_runners[i+1];
+ ploop->kt_runners[ploop->nkt_runners-1]->next = ploop->kt_runners[0];
+ ploop->last_used_runner = ploop->kt_runners[0];
+
ret = ploop_add_deltas_stack(ploop, &argv[0], argc);
if (ret)
goto err;
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 6f1b1e284fc3..452fc4a6c58f 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -146,14 +146,17 @@ enum {
struct ploop_worker {
struct ploop *ploop;
struct task_struct *task;
- u64 kcov_handle;
+ struct llist_head work_llist;
+ atomic_t inflight_pios;
+ struct ploop_worker *next;
};
struct ploop {
+ struct wait_queue_head dispatcher_wq_data;
struct dm_target *ti;
#define PLOOP_PRQ_POOL_SIZE 512 /* Twice nr_requests from blk_mq_init_sched() */
mempool_t *prq_pool;
-#define PLOOP_PIO_POOL_SIZE 256
+#define PLOOP_PIO_POOL_SIZE 512
mempool_t *pio_pool;
struct rb_root bat_entries;
@@ -198,7 +201,10 @@ struct ploop {
struct work_struct worker;
struct work_struct event_work;
- struct ploop_worker *kt_worker;
+ struct ploop_worker *kt_worker; /* dispatcher thread */
+ struct ploop_worker **kt_runners; /* pio runners */
+ unsigned int nkt_runners;
+ struct ploop_worker *last_used_runner;
struct completion inflight_bios_ref_comp;
struct percpu_ref inflight_bios_ref[2];
bool inflight_ref_comp_pending;
@@ -608,6 +614,7 @@ extern void ploop_enospc_timer(struct timer_list *timer);
extern loff_t ploop_llseek_hole(struct dm_target *ti, loff_t offset, int whence);
extern int ploop_worker(void *data);
+extern int ploop_pio_runner(void *data);
extern void ploop_disable_writeback_delay(struct ploop *ploop);
extern void ploop_enable_writeback_delay(struct ploop *ploop);
--
2.43.0
More information about the Devel
mailing list