[Devel] [RFC PATCH vz9 v5 45/49] dm-ploop: introduce pio runner threads

Alexander Atanasov alexander.atanasov at virtuozzo.com
Mon Nov 18 09:25:12 MSK 2024


Create threads to execute pios in parallel - call them pio runners.
Use number of CPUs to determine the number of threads started.
>From worker each pio is sent to a thread in round-robin fashion
thru work_llist. Maintain the number of pios sent so we can wait
for them to be processed - NB we only want to keep the order of
execution of different pio types  which can be different than
the order of their completion. We send a batch of pios to the runners
and if necessary we wait for them to be processed before moving
forwards - we need this for metadata writeback and flushes.

https://virtuozzo.atlassian.net/browse/VSTOR-91821
Signed-off-by: Alexander Atanasov <alexander.atanasov at virtuozzo.com>
---
 drivers/md/dm-ploop-map.c    | 131 ++++++++++++++++++++++++++++++++---
 drivers/md/dm-ploop-target.c |  44 ++++++++++--
 drivers/md/dm-ploop.h        |  14 +++-
 3 files changed, 170 insertions(+), 19 deletions(-)

diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index c7c07234571b..d8d8c7d66e87 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -20,6 +20,8 @@
 #include "dm-ploop.h"
 #include "dm-rq.h"
 
+static int inline ploop_runners_add_work(struct ploop *ploop, struct pio *pio);
+
 #define PREALLOC_SIZE (128ULL * 1024 * 1024)
 
 static void ploop_handle_cleanup(struct ploop *ploop, struct pio *pio);
@@ -1980,6 +1982,35 @@ static void process_ploop_fsync_work(struct ploop *ploop, struct llist_node *llf
 	}
 }
 
+static int inline ploop_runners_add_work(struct ploop *ploop, struct pio *pio)
+{
+	int i;
+	struct ploop_worker *wrkr;
+
+	if (++ploop->last_used_runner >= ploop->nkt_runners)
+		ploop->last_used_runner = 0;
+	wrkr = ploop->kt_runners[ploop->last_used_runner];
+
+	atomic_inc(&ploop->kt_worker->inflight_pios);
+	llist_add((struct llist_node *)(&pio->list), &wrkr->work_llist);
+	wake_up_process(wrkr->task);
+
+	return 0;
+}
+
+static int inline ploop_runners_add_work_list(struct ploop *ploop, struct llist_node *list)
+{
+	struct llist_node *pos, *t;
+	struct pio *pio;
+
+	llist_for_each_safe(pos, t, list) {
+		pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+		ploop_runners_add_work(ploop, pio);
+	}
+
+	return 0;
+}
+
 void do_ploop_run_work(struct ploop *ploop)
 {
 	LLIST_HEAD(deferred_pios);
@@ -2040,18 +2071,86 @@ void do_ploop_work(struct work_struct *ws)
 	do_ploop_run_work(ploop);
 }
 
+int ploop_pio_runner(void *data)
+{
+	struct ploop_worker *worker = data;
+	struct ploop *ploop = worker->ploop;
+	struct llist_node *llwork;
+	struct pio *pio;
+	struct llist_node *pos, *t;
+	unsigned int old_flags = current->flags;
+	int did_process_pios = 0;
+
+	for (;;) {
+		current->flags = old_flags;
+		clear_bit(PLOOP_WRK_BUSY, &worker->status);
+		set_current_state(TASK_INTERRUPTIBLE);
+
+check_for_more:
+		llwork = llist_del_all(&worker->work_llist);
+		if (!llwork) {
+			if (did_process_pios) {
+				did_process_pios = 0;
+				wake_up_interruptible(&ploop->dispatcher_wq_data);
+			}
+			/* Only stop when there is no more pios */
+			if (kthread_should_stop()) {
+				__set_current_state(TASK_RUNNING);
+				break;
+			}
+			schedule();
+			continue;
+		}
+		set_bit(PLOOP_WRK_BUSY, &worker->status);
+		__set_current_state(TASK_RUNNING);
+		old_flags = current->flags;
+		current->flags |= PF_IO_THREAD|PF_LOCAL_THROTTLE|PF_MEMALLOC_NOIO;
+
+		llist_for_each_safe(pos, t, llwork) {
+			pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+			INIT_LIST_HEAD(&pio->list);
+			switch(pio->queue_list_id) {
+			case PLOOP_LIST_FLUSH:
+				WARN_ON_ONCE(1);	/* We must not see flushes here */
+				break;
+			case PLOOP_LIST_PREPARE:
+				// fsync pios can come here for endio
+				// XXX: make it a FSYNC list
+				ploop_pio_endio(pio);
+				break;
+			case PLOOP_LIST_DEFERRED:
+				ploop_process_one_deferred_bio(ploop, pio);
+				break;
+			case PLOOP_LIST_COW:
+				ploop_process_one_delta_cow(ploop, pio);
+				break;
+			case PLOOP_LIST_DISCARD:
+				ploop_process_one_discard_pio(ploop, pio);
+				break;
+				// XXX: make it list MDWB
+			case PLOOP_LIST_INVALID: /* resubmit sets the list id to invalid */
+				ploop_submit_rw_mapped(ploop, pio);
+				break;
+			default:
+				WARN_ON_ONCE(1);
+			}
+			atomic_dec(&ploop->kt_worker->inflight_pios);
+		}
+		cond_resched();
+		did_process_pios = 1;
+		goto check_for_more;
+	}
+	return 0;
+}
+
 int ploop_worker(void *data)
 {
 	struct ploop_worker *worker = data;
 	struct ploop *ploop = worker->ploop;
 
-        for (;;) {
-                set_current_state(TASK_INTERRUPTIBLE);
+	for (;;) {
+		set_current_state(TASK_INTERRUPTIBLE);
 
-                if (kthread_should_stop()) {
-                        __set_current_state(TASK_RUNNING);
-                        break;
-                }
 #ifdef USE_KTHREAD
 		smp_rmb();
 		if (llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) &&
@@ -2059,16 +2158,30 @@ int ploop_worker(void *data)
 			llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) &&
 			llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) &&
 			llist_empty(&ploop->pios[PLOOP_LIST_COW]) &&
-			llist_empty(&ploop->llresubmit_pios)
-			)
+			llist_empty(&ploop->llresubmit_pios) &&
+			!ploop->force_md_writeback
+			) {
+
+				if (kthread_should_stop()) {
+					wait_event_interruptible(ploop->dispatcher_wq_data, (!ploop_runners_have_pending(ploop)));
+					__set_current_state(TASK_RUNNING);
+					break;
+				}
 				schedule();
+				/* Fall thru - check for pending work */
+		}
 
 		__set_current_state(TASK_RUNNING);
 		do_ploop_run_work(ploop);
-                cond_resched();
+		cond_resched(); /* give other processes chance to run */
 #else
 		schedule();	// just do nothing yet
 #endif
+		if (kthread_should_stop()) {
+			wait_event_interruptible(ploop->dispatcher_wq_data, (!ploop_runners_have_pending(ploop)));
+			__set_current_state(TASK_RUNNING);
+			break;
+		}
         }
         return 0;
 
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
index 57de1801e5cd..0fa36a7e58c9 100644
--- a/drivers/md/dm-ploop-target.c
+++ b/drivers/md/dm-ploop-target.c
@@ -192,8 +192,8 @@ static void ploop_destroy(struct ploop *ploop)
 	}
 
 	if (ploop->kt_worker) {
+		ploop->force_md_writeback = 1;
 		wake_up_process(ploop->kt_worker->task);
-		// FIXME: a better way to wait for lists to drain
 		/* try to send all pending - if we have partial io and enospc end bellow */
 		while(!llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) ||
 			!llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) ||
@@ -205,12 +205,26 @@ static void ploop_destroy(struct ploop *ploop)
 				smp_rmb();
 			}
 
+		if (ploop->kt_runners) {
+			for (i=0; i < ploop->nkt_runners; i++) {
+				if (ploop->kt_runners[i]) {
+					wake_up_process(ploop->kt_runners[i]->task);
+					kthread_stop(ploop->kt_runners[i]->task);
+					kfree(ploop->kt_runners[i]);
+			        }
+			}
+		}
+
 	        kthread_stop(ploop->kt_worker->task);	/* waits for the thread to stop */
+
 		WARN_ON(!llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]));
 		WARN_ON(!llist_empty(&ploop->llresubmit_pios));
-	       	// TODO: check if any pios left and end them with error
-        	kfree(ploop->kt_worker);
+		WARN_ON(!llist_empty(&ploop->enospc_pios));
+		kfree(ploop->kt_runners);
+		kfree(ploop->kt_worker);
 	}
+	WARN_ON_ONCE(ploop_has_pending_activity(ploop));
+
 	print_ploop_lists(ploop);
 
 	for (i = 0; i < 2; i++)
@@ -378,7 +392,8 @@ ALLOW_ERROR_INJECTION(ploop_add_deltas_stack, ERRNO);
 		argv++;						\
 	} while (0);
 
-static struct ploop_worker *ploop_worker_create(struct ploop *ploop)
+static struct ploop_worker *ploop_worker_create(struct ploop *ploop,
+	int (*worker_fn)(void *), const char *pref, int id)
 {
 	struct ploop_worker *worker;
 	struct task_struct *task;
@@ -388,12 +403,13 @@ static struct ploop_worker *ploop_worker_create(struct ploop *ploop)
                 return NULL;
 
         worker->ploop = ploop;
-	task = kthread_create(ploop_worker, worker, "ploop-%d-0",
-			current->pid);
+	task = kthread_create(worker_fn, worker, "ploop-%d-%s-%d",
+			current->pid, pref, id);
 
 	if (IS_ERR(task))
 		goto out_err;
 	worker->task = task;
+	init_llist_head(&worker->work_llist);
 
 	wake_up_process(task);
 	// TODO: handle cgroups
@@ -566,10 +582,24 @@ static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 		goto err;
 
 
-	ploop->kt_worker = ploop_worker_create(ploop);
+	init_waitqueue_head(&ploop->dispatcher_wq_data);
+
+	ploop->kt_worker = ploop_worker_create(ploop, ploop_worker, "d", 0);
 	if (!ploop->kt_worker)
 		goto err;
 
+/* make it a param = either module or cpu based or dev req queue */
+#define PLOOP_PIO_RUNNERS nr_cpu_ids
+	ploop->kt_runners = kcalloc(PLOOP_PIO_RUNNERS, sizeof(struct kt_worker *), GFP_KERNEL);
+	if (!ploop->kt_runners)
+		goto err;
+
+	ploop->nkt_runners = PLOOP_PIO_RUNNERS;
+	for (i=0; i < ploop->nkt_runners; i++) {
+		ploop->kt_runners[i] = ploop_worker_create(ploop, ploop_pio_runner, "r", i+1);
+		if (!ploop->kt_runners[i])
+			goto err;
+	}
 	ret = ploop_add_deltas_stack(ploop, &argv[0], argc);
 	if (ret)
 		goto err;
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 78252f5ae6a1..98533900a42d 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -148,14 +148,18 @@ enum {
 struct ploop_worker {
 	struct ploop 		*ploop;
 	struct task_struct 	*task;
-	u64			kcov_handle;
+	struct llist_head 	work_llist;
+#define PLOOP_WRK_BUSY		(1U<<0)
+	unsigned long 		status;
+	atomic_t		inflight_pios;
 };
 
 struct ploop {
+	struct wait_queue_head dispatcher_wq_data;
 	struct dm_target *ti;
 #define PLOOP_PRQ_POOL_SIZE 512 /* Twice nr_requests from blk_mq_init_sched() */
 	mempool_t *prq_pool;
-#define PLOOP_PIO_POOL_SIZE 256
+#define PLOOP_PIO_POOL_SIZE 512
 	mempool_t *pio_pool;
 
 	struct rb_root bat_entries;
@@ -201,7 +205,10 @@ struct ploop {
 	struct work_struct worker;
 	struct work_struct event_work;
 
-	struct ploop_worker *kt_worker;
+	struct ploop_worker *kt_worker;	 /* dispatcher thread */
+	struct ploop_worker **kt_runners; /* pio runners */
+	unsigned int nkt_runners;
+	unsigned int last_used_runner;
 	struct completion inflight_bios_ref_comp;
 	struct percpu_ref inflight_bios_ref[2];
 	bool inflight_ref_comp_pending;
@@ -613,6 +620,7 @@ extern void ploop_enospc_timer(struct timer_list *timer);
 extern loff_t ploop_llseek_hole(struct dm_target *ti, loff_t offset, int whence);
 
 extern int ploop_worker(void *data);
+extern int ploop_pio_runner(void *data);
 
 extern void ploop_disable_writeback_delay(struct ploop *ploop);
 extern void ploop_enable_writeback_delay(struct ploop *ploop);
-- 
2.43.0



More information about the Devel mailing list