[Devel] [PATCH vz9 v1 38/63] dm-ploop: introduce pio runner threads

Alexander Atanasov alexander.atanasov at virtuozzo.com
Fri Jan 24 18:36:12 MSK 2025


Create threads to execute pios in parallel - call them pio runners.
Use number of CPUs to determine the number of threads started.
>From worker each pio is sent to a thread in round-robin fashion
thru work_llist. Maintain the number of pios sent so we can wait
for them to be processed - NB we only want to keep the order of
execution of different pio types  which can be different than
the order of their completion. We send a batch of pios to the runners
and if necessary we wait for them to be processed before moving
forwards - we need this for metadata writeback and flushes.

https://virtuozzo.atlassian.net/browse/VSTOR-91821
Signed-off-by: Alexander Atanasov <alexander.atanasov at virtuozzo.com>
---
 drivers/md/dm-ploop-map.c    | 136 ++++++++++++++++++++++++++++++++---
 drivers/md/dm-ploop-target.c |  44 ++++++++++--
 drivers/md/dm-ploop.h        |  13 +++-
 3 files changed, 175 insertions(+), 18 deletions(-)

diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index f59889d2cd89..b820e9ba1218 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -20,6 +20,8 @@
 #include "dm-ploop.h"
 #include "dm-rq.h"
 
+static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio);
+
 #define PREALLOC_SIZE (128ULL * 1024 * 1024)
 
 static void ploop_handle_cleanup(struct ploop *ploop, struct pio *pio);
@@ -1894,6 +1896,11 @@ static void ploop_process_resubmit_pios(struct ploop *ploop,
 	}
 }
 
+static inline int ploop_runners_have_pending(struct ploop *ploop)
+{
+	return atomic_read(&ploop->kt_worker->inflight_pios);
+}
+
 static int ploop_submit_metadata_writeback(struct ploop *ploop)
 {
 	unsigned long flags;
@@ -1958,6 +1965,33 @@ static void process_ploop_fsync_work(struct ploop *ploop, struct llist_node *llf
 	}
 }
 
+static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio)
+{
+	struct ploop_worker *wrkr;
+
+	wrkr = READ_ONCE(ploop->last_used_runner)->next;
+	WRITE_ONCE(ploop->last_used_runner, wrkr);
+
+	atomic_inc(&ploop->kt_worker->inflight_pios);
+	llist_add((struct llist_node *)(&pio->list), &wrkr->work_llist);
+	wake_up_process(wrkr->task);
+
+	return 0;
+}
+
+static inline int ploop_runners_add_work_list(struct ploop *ploop, struct llist_node *list)
+{
+	struct llist_node *pos, *t;
+	struct pio *pio;
+
+	llist_for_each_safe(pos, t, list) {
+		pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+		ploop_runners_add_work(ploop, pio);
+	}
+
+	return 0;
+}
+
 void do_ploop_run_work(struct ploop *ploop)
 {
 	LLIST_HEAD(deferred_pios);
@@ -2017,30 +2051,110 @@ void do_ploop_work(struct work_struct *ws)
 	do_ploop_run_work(ploop);
 }
 
-int ploop_worker(void *data)
+int ploop_pio_runner(void *data)
 {
 	struct ploop_worker *worker = data;
 	struct ploop *ploop = worker->ploop;
+	struct llist_node *llwork;
+	struct pio *pio;
+	struct llist_node *pos, *t;
+	unsigned int old_flags = current->flags;
+	int did_process_pios = 0;
 
 	for (;;) {
+		current->flags = old_flags;
 		set_current_state(TASK_INTERRUPTIBLE);
 
-		if (kthread_should_stop()) {
-			__set_current_state(TASK_RUNNING);
-			break;
+check_for_more:
+		llwork = llist_del_all(&worker->work_llist);
+		if (!llwork) {
+			if (did_process_pios) {
+				did_process_pios = 0;
+				wake_up_interruptible(&ploop->dispatcher_wq_data);
+			}
+			/* Only stop when there is no more pios */
+			if (kthread_should_stop()) {
+				__set_current_state(TASK_RUNNING);
+				break;
+			}
+			schedule();
+			continue;
 		}
+		__set_current_state(TASK_RUNNING);
+		old_flags = current->flags;
+		current->flags |= PF_IO_THREAD|PF_LOCAL_THROTTLE|PF_MEMALLOC_NOIO;
+
+		llist_for_each_safe(pos, t, llwork) {
+			pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+			INIT_LIST_HEAD(&pio->list);
+			switch (pio->queue_list_id) {
+			case PLOOP_LIST_FLUSH:
+				WARN_ON_ONCE(1);	/* We must not see flushes here */
+				break;
+			case PLOOP_LIST_PREPARE:
+				// fsync pios can come here for endio
+				// XXX: make it a FSYNC list
+				ploop_pio_endio(pio);
+				break;
+			case PLOOP_LIST_DEFERRED:
+				ploop_process_one_deferred_bio(ploop, pio);
+				break;
+			case PLOOP_LIST_COW:
+				ploop_process_one_delta_cow(ploop, pio);
+				break;
+			case PLOOP_LIST_DISCARD:
+				ploop_process_one_discard_pio(ploop, pio);
+				break;
+				// XXX: make it list MDWB
+			case PLOOP_LIST_INVALID: /* resubmit sets the list id to invalid */
+				ploop_submit_rw_mapped(ploop, pio);
+				break;
+			default:
+				WARN_ON_ONCE(1);
+			}
+			atomic_dec(&ploop->kt_worker->inflight_pios);
+		}
+		cond_resched();
+		did_process_pios = 1;
+		goto check_for_more;
+	}
+	return 0;
+}
+
+int ploop_worker(void *data)
+{
+	struct ploop_worker *worker = data;
+	struct ploop *ploop = worker->ploop;
+
+	for (;;) {
+		set_current_state(TASK_INTERRUPTIBLE);
+
 		if (llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) &&
-			llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) &&
-			llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) &&
-			llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) &&
-			llist_empty(&ploop->pios[PLOOP_LIST_COW]) &&
-			llist_empty(&ploop->llresubmit_pios)
-			)
+		    llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) &&
+		    llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) &&
+		    llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) &&
+		    llist_empty(&ploop->pios[PLOOP_LIST_COW]) &&
+		    llist_empty(&ploop->llresubmit_pios) &&
+		    !ploop->force_md_writeback) {
+			if (kthread_should_stop()) {
+				wait_event_interruptible(ploop->dispatcher_wq_data,
+						(!ploop_runners_have_pending(ploop)));
+				__set_current_state(TASK_RUNNING);
+				break;
+			}
 			schedule();
+			/* now check for pending work */
+		}
 
 		__set_current_state(TASK_RUNNING);
 		do_ploop_run_work(ploop);
-		cond_resched();
+		cond_resched(); /* give other processes chance to run */
+		if (kthread_should_stop()) {
+			wait_event_interruptible(ploop->dispatcher_wq_data,
+						(!ploop_runners_have_pending(ploop)));
+			__set_current_state(TASK_RUNNING);
+			break;
+		}
 	}
 	return 0;
 }
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
index dc63c18cece8..3fed26137831 100644
--- a/drivers/md/dm-ploop-target.c
+++ b/drivers/md/dm-ploop-target.c
@@ -164,6 +164,7 @@ static void ploop_destroy(struct ploop *ploop)
 	int i;
 
 	if (ploop->kt_worker) {
+		ploop->force_md_writeback = 1;
 		wake_up_process(ploop->kt_worker->task);
 		/* try to send all pending - if we have partial io and enospc end bellow */
 		while (!llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) ||
@@ -175,9 +176,22 @@ static void ploop_destroy(struct ploop *ploop)
 			schedule();
 		}
 
+		if (ploop->kt_runners) {
+			for (i = 0; i < ploop->nkt_runners; i++) {
+				if (ploop->kt_runners[i]) {
+					wake_up_process(ploop->kt_runners[i]->task);
+					kthread_stop(ploop->kt_runners[i]->task);
+					kfree(ploop->kt_runners[i]);
+				}
+			}
+		}
+
 		kthread_stop(ploop->kt_worker->task);	/* waits for the thread to stop */
+
 		WARN_ON(!llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]));
 		WARN_ON(!llist_empty(&ploop->llresubmit_pios));
+		WARN_ON(!llist_empty(&ploop->enospc_pios));
+		kfree(ploop->kt_runners);
 		kfree(ploop->kt_worker);
 	}
 
@@ -347,7 +361,8 @@ ALLOW_ERROR_INJECTION(ploop_add_deltas_stack, ERRNO);
 		argv++;						\
 	} while (0);
 
-static struct ploop_worker *ploop_worker_create(struct ploop *ploop)
+static struct ploop_worker *ploop_worker_create(struct ploop *ploop,
+	int (*worker_fn)(void *), const char *pref, int id)
 {
 	struct ploop_worker *worker;
 	struct task_struct *task;
@@ -357,12 +372,13 @@ static struct ploop_worker *ploop_worker_create(struct ploop *ploop)
 		return NULL;
 
 	worker->ploop = ploop;
-	task = kthread_create(ploop_worker, worker, "ploop-%d-0",
-				current->pid);
+	task = kthread_create(worker_fn, worker, "ploop-%d-%s-%d",
+			current->pid, pref, id);
 
 	if (IS_ERR(task))
 		goto out_err;
 	worker->task = task;
+	init_llist_head(&worker->work_llist);
 
 	wake_up_process(task);
 
@@ -521,10 +537,30 @@ static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 		goto err;
 
 
-	ploop->kt_worker = ploop_worker_create(ploop);
+	init_waitqueue_head(&ploop->dispatcher_wq_data);
+
+	ploop->kt_worker = ploop_worker_create(ploop, ploop_worker, "d", 0);
 	if (!ploop->kt_worker)
 		goto err;
 
+/* make it a param = either module or cpu based or dev req queue */
+#define PLOOP_PIO_RUNNERS nr_cpu_ids
+	ploop->kt_runners = kcalloc(PLOOP_PIO_RUNNERS, sizeof(struct kt_worker *), GFP_KERNEL);
+	if (!ploop->kt_runners)
+		goto err;
+
+	ploop->nkt_runners = PLOOP_PIO_RUNNERS;
+	for (i = 0; i < ploop->nkt_runners; i++) {
+		ploop->kt_runners[i] = ploop_worker_create(ploop, ploop_pio_runner, "r", i+1);
+		if (!ploop->kt_runners[i])
+			goto err;
+	}
+
+	for (i = 0; i < ploop->nkt_runners-1; i++)
+		ploop->kt_runners[i]->next = ploop->kt_runners[i+1];
+	ploop->kt_runners[ploop->nkt_runners-1]->next = ploop->kt_runners[0];
+	ploop->last_used_runner = ploop->kt_runners[0];
+
 	ret = ploop_add_deltas_stack(ploop, &argv[0], argc);
 	if (ret)
 		goto err;
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 6f1b1e284fc3..452fc4a6c58f 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -146,14 +146,17 @@ enum {
 struct ploop_worker {
 	struct ploop		*ploop;
 	struct task_struct	*task;
-	u64			kcov_handle;
+	struct llist_head	work_llist;
+	atomic_t		inflight_pios;
+	struct ploop_worker	*next;
 };
 
 struct ploop {
+	struct wait_queue_head dispatcher_wq_data;
 	struct dm_target *ti;
 #define PLOOP_PRQ_POOL_SIZE 512 /* Twice nr_requests from blk_mq_init_sched() */
 	mempool_t *prq_pool;
-#define PLOOP_PIO_POOL_SIZE 256
+#define PLOOP_PIO_POOL_SIZE 512
 	mempool_t *pio_pool;
 
 	struct rb_root bat_entries;
@@ -198,7 +201,10 @@ struct ploop {
 	struct work_struct worker;
 	struct work_struct event_work;
 
-	struct ploop_worker *kt_worker;
+	struct ploop_worker *kt_worker;	 /* dispatcher thread */
+	struct ploop_worker **kt_runners; /* pio runners */
+	unsigned int nkt_runners;
+	struct ploop_worker *last_used_runner;
 	struct completion inflight_bios_ref_comp;
 	struct percpu_ref inflight_bios_ref[2];
 	bool inflight_ref_comp_pending;
@@ -608,6 +614,7 @@ extern void ploop_enospc_timer(struct timer_list *timer);
 extern loff_t ploop_llseek_hole(struct dm_target *ti, loff_t offset, int whence);
 
 extern int ploop_worker(void *data);
+extern int ploop_pio_runner(void *data);
 
 extern void ploop_disable_writeback_delay(struct ploop *ploop);
 extern void ploop_enable_writeback_delay(struct ploop *ploop);
-- 
2.43.0



More information about the Devel mailing list