[Devel] [RFC PATCH vz9 v6 12/62] dm-ploop: WIP move from wq to kthread

Pavel Tikhomirov ptikhomirov at virtuozzo.com
Fri Jan 10 13:28:18 MSK 2025



On 12/6/24 05:55, Alexander Atanasov wrote:
> This is a base for moving to multithreaded model.
> 
> https://virtuozzo.atlassian.net/browse/VSTOR-91821
> Signed-off-by: Alexander Atanasov <alexander.atanasov at virtuozzo.com>
> ---
>   drivers/md/dm-ploop-map.c    | 65 ++++++++++++++++++++++++++++++++----
>   drivers/md/dm-ploop-target.c | 60 +++++++++++++++++++++++++++++++++
>   drivers/md/dm-ploop.h        | 10 ++++++
>   3 files changed, 128 insertions(+), 7 deletions(-)
> 
> diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
> index 0240d6fc376c..28bced7684e5 100644
> --- a/drivers/md/dm-ploop-map.c
> +++ b/drivers/md/dm-ploop-map.c
> @@ -338,6 +338,16 @@ static int ploop_split_pio_to_list(struct ploop *ploop, struct pio *pio,
>   }
>   ALLOW_ERROR_INJECTION(ploop_split_pio_to_list, ERRNO);
>   
> +#define USE_KTHREAD
> +static void ploop_schedule_work(struct ploop *ploop)
> +{
> +#ifndef USE_KTHREAD
> +	queue_work(ploop->wq, &ploop->worker);
> +#else
> +	wake_up_process(ploop->kt_worker->task);
> +#endif
> +}
> +
>   static void ploop_dispatch_pio(struct ploop *ploop, struct pio *pio)
>   {
>   	struct llist_head *list = (struct llist_head *)&ploop->pios[pio->queue_list_id];
> @@ -361,7 +371,7 @@ void ploop_dispatch_pios(struct ploop *ploop, struct pio *pio,
>   			ploop_dispatch_pio(ploop, pio);
>   	}
>   
> -	queue_work(ploop->wq, &ploop->worker);
> +	ploop_schedule_work(ploop);
>   }
>   
>   static bool ploop_delay_if_md_busy(struct ploop *ploop, struct md_page *md,
> @@ -693,7 +703,7 @@ static void ploop_complete_cow(struct ploop_cow *cow, blk_status_t bi_status)
>   
>   	ploop_queue_or_fail(ploop, blk_status_to_errno(bi_status), cow_pio);
>   
> -	queue_work(ploop->wq, &ploop->worker);
> +	ploop_schedule_work(ploop);
>   	ploop_free_pio_with_pages(ploop, cow->aux_pio);
>   	kmem_cache_free(cow_cache, cow);
>   }
> @@ -1142,7 +1152,7 @@ static void ploop_queue_resubmit(struct pio *pio)
>   
>   	llist_add((struct llist_node *)(&pio->list), &ploop->llresubmit_pios);
>   
> -	queue_work(ploop->wq, &ploop->worker);
> +	ploop_schedule_work(ploop);
>   }
>   
>   static void ploop_check_standby_mode(struct ploop *ploop, long res)
> @@ -1815,9 +1825,8 @@ static void process_ploop_fsync_work(struct ploop *ploop)
>   	}
>   }
>   
> -void do_ploop_work(struct work_struct *ws)
> +void do_ploop_run_work(struct ploop *ploop)
>   {
> -	struct ploop *ploop = container_of(ws, struct ploop, worker);
>   	LIST_HEAD(deferred_pios);
>   	struct llist_node *llembedded_pios;
>   	struct llist_node *lldeferred_pios;
> @@ -1829,12 +1838,14 @@ void do_ploop_work(struct work_struct *ws)
>   	current->flags |= PF_IO_THREAD|PF_LOCAL_THROTTLE|PF_MEMALLOC_NOIO;
>   
>   	llembedded_pios = llist_del_all(&ploop->pios[PLOOP_LIST_PREPARE]);
> +	smp_wmb(); /* */
> +
>   	lldeferred_pios = llist_del_all(&ploop->pios[PLOOP_LIST_DEFERRED]);
>   	lldiscard_pios = llist_del_all(&ploop->pios[PLOOP_LIST_DISCARD]);
>   	llcow_pios = llist_del_all(&ploop->pios[PLOOP_LIST_COW]);
>   	llresubmit = llist_del_all(&ploop->llresubmit_pios);
>   
> -	/* add old deferred to the list */
> +	/* add old deferred back to the list */
>   	if (lldeferred_pios) {
>   		struct llist_node *pos, *t;
>   		struct pio *pio;
> @@ -1866,6 +1877,46 @@ void do_ploop_work(struct work_struct *ws)
>   	current->flags = old_flags;
>   }
>   
> +void do_ploop_work(struct work_struct *ws)
> +{
> +	struct ploop *ploop = container_of(ws, struct ploop, worker);
> +
> +	do_ploop_run_work(ploop);
> +}
> +
> +int ploop_worker(void *data)
> +{
> +	struct ploop_worker *worker = data;
> +	struct ploop *ploop = worker->ploop;
> +
> +	for (;;) {
> +		set_current_state(TASK_INTERRUPTIBLE);
> +
> +		if (kthread_should_stop()) {
> +			__set_current_state(TASK_RUNNING);
> +			break;
> +		}
> +#ifdef USE_KTHREAD
> +		smp_rmb(); /* */

1. This requires explanation both in comments and in commit message. 
What reads and what writes do you try to separate by memory barriers?

2. It is very hard to guess. It can't be to divide task interruptible 
state setting and llist_empty checks, because when you should've used 
smp_mb (as you have both write and read). It can be to divide 
kthread_should_stop read and llist_empty check reads. But, if it was so, 
then why do we have smp_rmb (and not wmb) in ploop_destroy where we have 
kthread_stop which is effectively write.

3. Side note, here we probably need atomic flag setting else flugs can 
get corrupted in case concurrent ploop_destroy->kthread_stop.

do_ploop_run_work() {
   current->flags |= PF_IO_THREAD|PF_LOCAL_THROTTLE|PF_MEMALLOC_NOIO;
}

4. Also the use of smp_rmb in ploop_destroy looks like separating 
llist_empty reads between themselves on while loop iterations. But it's 
unclear to me why we might need it...

5. Also note that wake_up_process already has implicit write/read memory 
bariers on succesful wakeup.

Please write some explanations, else reviewing it is too complex.

> +		if (llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) &&
> +			llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) &&
> +			llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) &&
> +			llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) &&
> +			llist_empty(&ploop->pios[PLOOP_LIST_COW]) &&
> +			llist_empty(&ploop->llresubmit_pios)
> +			)
> +			schedule();
> +
> +		__set_current_state(TASK_RUNNING);
> +		do_ploop_run_work(ploop);
> +		cond_resched();
> +#else
> +		schedule();	// just do nothing yet
> +#endif
> +	}
> +	return 0;
> +}
> +
>   static void ploop_submit_embedded_pio(struct ploop *ploop, struct pio *pio)
>   {
>   	struct ploop_rq *prq = pio->endio_cb_data;
> @@ -1890,7 +1941,7 @@ static void ploop_submit_embedded_pio(struct ploop *ploop, struct pio *pio)
>   
>   out:
>   	if (queue)
> -		queue_work(ploop->wq, &ploop->worker);
> +		ploop_schedule_work(ploop);
>   }
>   
>   void ploop_submit_embedded_pios(struct ploop *ploop, struct list_head *list)
> diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
> index ea9af6b6abe9..8fd3f51ff81f 100644
> --- a/drivers/md/dm-ploop-target.c
> +++ b/drivers/md/dm-ploop-target.c
> @@ -166,6 +166,28 @@ static void ploop_destroy(struct ploop *ploop)
>   		destroy_workqueue(ploop->wq);
>   		WARN_ON_ONCE(ploop_has_pending_activity(ploop));
>   	}
> +
> +	if (ploop->kt_worker) {
> +		wake_up_process(ploop->kt_worker->task);
> +		// FIXME: a better way to wait for lists to drain
> +		/* try to send all pending - if we have partial io and enospc end bellow */
> +		while (!llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) ||
> +			!llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) ||
> +			!llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) ||
> +			!llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) ||
> +			!llist_empty(&ploop->pios[PLOOP_LIST_COW])
> +			) {
> +			schedule();
> +			smp_rmb(); /* */
> +		}
> +
> +		kthread_stop(ploop->kt_worker->task);	/* waits for the thread to stop */
> +		WARN_ON(!llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]));
> +		WARN_ON(!llist_empty(&ploop->llresubmit_pios));
> +		// TODO: check if any pios left and end them with error
> +		kfree(ploop->kt_worker);
> +	}
> +
>   	for (i = 0; i < 2; i++)
>   		percpu_ref_exit(&ploop->inflight_bios_ref[i]);
>   	/* Nobody uses it after destroy_workqueue() */
> @@ -330,6 +352,39 @@ ALLOW_ERROR_INJECTION(ploop_add_deltas_stack, ERRNO);
>   		argc--;						\
>   		argv++;						\
>   	} while (0);
> +
> +static struct ploop_worker *ploop_worker_create(struct ploop *ploop)
> +{
> +	struct ploop_worker *worker;
> +	struct task_struct *task;
> +
> +	worker = kzalloc(sizeof(*worker), GFP_KERNEL_ACCOUNT);
> +	if (!worker)
> +		return NULL;
> +
> +	worker->ploop = ploop;
> +	task = kthread_create(ploop_worker, worker, "ploop-%d-0",
> +				current->pid);
> +
> +	if (IS_ERR(task))
> +		goto out_err;
> +	worker->task = task;
> +
> +	wake_up_process(task);
> +	// TODO: handle cgroups
> +	// ret = ploop_attach_cgroups(worker);
> +	// if (ret)
> +	//         goto stop_worker;
> +
> +	return worker;
> +
> +// stop_worker:
> +//         kthread_stop(worker->task);
> +out_err:
> +	kfree(worker);
> +	return NULL;
> +}
> +#endif
>   /*
>    * <data dev>
>    */
> @@ -474,6 +529,11 @@ static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
>   	if (argc <= 0)
>   		goto err;
>   
> +
> +	ploop->kt_worker = ploop_worker_create(ploop);
> +	if (!ploop->kt_worker)
> +		goto err;
> +
>   	ret = ploop_add_deltas_stack(ploop, &argv[0], argc);
>   	if (ret)
>   		goto err;
> diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
> index bd4906e4c2b5..f4d3dce02d6c 100644
> --- a/drivers/md/dm-ploop.h
> +++ b/drivers/md/dm-ploop.h
> @@ -139,6 +139,12 @@ enum {
>   	PLOOP_LIST_INVALID = PLOOP_LIST_COUNT,
>   };
>   
> +struct ploop_worker {
> +	struct ploop		*ploop;
> +	struct task_struct	*task;
> +	u64			kcov_handle;
> +};
> +
>   struct ploop {
>   	struct dm_target *ti;
>   #define PLOOP_PRQ_POOL_SIZE 512 /* Twice nr_requests from blk_mq_init_sched() */
> @@ -184,6 +190,7 @@ struct ploop {
>   	struct work_struct worker;
>   	struct work_struct event_work;
>   
> +	struct ploop_worker *kt_worker;
>   	struct completion inflight_bios_ref_comp;
>   	struct percpu_ref inflight_bios_ref[2];
>   	bool inflight_ref_comp_pending;
> @@ -599,4 +606,7 @@ extern void ploop_call_rw_iter(struct file *file, loff_t pos, unsigned rw,
>   			       struct iov_iter *iter, struct pio *pio);
>   extern void ploop_enospc_timer(struct timer_list *timer);
>   extern loff_t ploop_llseek_hole(struct dm_target *ti, loff_t offset, int whence);
> +
> +int ploop_worker(void *data);
> +
>   #endif /* __DM_PLOOP_H */

-- 
Best regards, Tikhomirov Pavel
Senior Software Developer, Virtuozzo.



More information about the Devel mailing list