[Devel] [RFC PATCH vz9 v6 61/62] dm-ploop: rework bat completion logic
Andrey Zhadchenko
andrey.zhadchenko at virtuozzo.com
Fri Dec 20 17:23:56 MSK 2024
On 12/5/24 22:56, Alexander Atanasov wrote:
> a pio may complete after md page update, in that case we
> must not complete the update but wait for the last data pio
> and only then complete them all.
>
> https://virtuozzo.atlassian.net/browse/VSTOR-91821
> Signed-off-by: Alexander Atanasov <alexander.atanasov at virtuozzo.com>
> ---
> drivers/md/dm-ploop-map.c | 57 ++++++++++++++++++++++++++-------------
> drivers/md/dm-ploop.h | 1 +
> 2 files changed, 39 insertions(+), 19 deletions(-)
>
> diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
> index 482022d6b60b..9da996935945 100644
> --- a/drivers/md/dm-ploop-map.c
> +++ b/drivers/md/dm-ploop-map.c
> @@ -598,7 +598,6 @@ static void ploop_unlink_completed_pio(struct ploop *ploop, struct pio *pio)
>
> static bool ploop_md_make_dirty(struct ploop *ploop, struct md_page *md)
> {
> - unsigned long flags;
> bool new = false;
>
> /*
> @@ -896,6 +895,7 @@ static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
> if (wait_llist_pending) {
> llist_for_each_safe(pos, t, wait_llist_pending) {
> pio = list_entry((struct list_head *)pos, typeof(*pio), list);
> + INIT_LIST_HEAD(&pio->list);
> list_add(&pio->list, &list);
> }
> }
> @@ -962,15 +962,22 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
> */
> ploop_advance_local_after_bat_wb(ploop, piwb, true);
> }
> + } else {
> + PL_ERR("piwb not completed\n");
> }
>
> spin_lock_irqsave(&piwb->lock, flags);
> - if (completed)
> + if (completed) {
> piwb->completed = completed;
> - piwb->bi_status = bi_status;
> - ll_ready_pios = llist_reverse_order(llist_del_all(&piwb->llready_data_pios));
> + ll_ready_pios = llist_del_all(&piwb->llready_data_pios);
> + } else {
> + piwb->wait_for_last = 1;
> + }
> + if (bi_status)
> + piwb->bi_status = bi_status;
> spin_unlock_irqrestore(&piwb->lock, flags);
> -
> + if (!completed)
> + return;
> ll_cow_pios = llist_reverse_order(llist_del_all(&piwb->cow_llist));
>
> /*
> @@ -978,8 +985,8 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
> */
>
> llist_for_each_safe(pos, t, ll_ready_pios) {
> - pio = list_entry((struct list_head *)pos, typeof(*aux_pio), list);
> - INIT_LIST_HEAD(&pio->list);
> + data_pio = list_entry((struct list_head *)pos, typeof(*data_pio), list);
> + INIT_LIST_HEAD(&data_pio->list);
> if (bi_status)
> data_pio->bi_status = bi_status;
> ploop_pio_endio(data_pio);
> @@ -1026,9 +1033,10 @@ static int ploop_prepare_bat_update(struct ploop *ploop, struct md_page *md,
> piwb->pio = pio = ploop_alloc_pio(ploop, GFP_NOIO);
> if (!page || !pio)
> goto err;
> - piwb->kmpage = kmap(piwb->bat_page);
> ploop_init_pio(ploop, REQ_OP_WRITE, pio);
> -
> + piwb->kmpage = kmap(piwb->bat_page);
> + WARN_ON(!piwb->kmpage);
> + piwb->wait_for_last = 0;
> bat_entries = md->kmpage;
>
> spin_lock_irq(&md->md_lock);
> @@ -1251,6 +1259,7 @@ static bool ploop_data_pio_end(struct pio *pio)
> struct ploop_index_wb *piwb = pio->piwb;
> unsigned long flags;
> bool completed;
> + int last_pio = 0;
>
> spin_lock_irqsave(&piwb->lock, flags);
> completed = piwb->completed;
> @@ -1258,12 +1267,26 @@ static bool ploop_data_pio_end(struct pio *pio)
> llist_add((struct llist_node *)(&pio->list), &piwb->llready_data_pios);
> else if (!pio->bi_status)
> pio->bi_status = piwb->bi_status;
> - spin_unlock_irqrestore(&piwb->lock, flags);
> /* If pio is late then end it here. this can happen with flushes */
> - if (completed)
> + if (atomic_read(&piwb->count) == 2) {
I think this is really complicated. Why not just call
ploop_bat_write_complete() when count reaches zero (and dec count on
data pio end or metadata pio end).
In current scheme I think we will explode if this code is run
in-parallel. Imagine md write is complete, but several data pios are
ended simultaneously and execute this in-parallel. They check that count
is not 2 (for example there is 3 parallel datapio). Now all of them have
last_pio == 0. Then all 3 go ploop_put_piwb(). piwb count is now zero
and no one has called ploop_bat_write_complete().
piwb->lock didn't help us since there is still may be parallel execution
between unlock and piwb->count dec inside ploop_put_piwb().
But maybe I am missing something
> + if (piwb->wait_for_last)
> + last_pio = 1;
> + }
> + spin_unlock_irqrestore(&piwb->lock, flags);
> + ploop_put_piwb(piwb);
> + if (completed) {
> + struct ploop *ploop = pio->ploop;
> +
> + PL_ERR("late pio already completed\n");
> ploop_pio_endio(pio);
> + } else if (last_pio) {
> + /*
> + * Status is set from first call to ploop_bat_write_complete
> + * zero keeps it as is
> + */
> + ploop_bat_write_complete(piwb->pio, piwb, piwb->bi_status);
> + }
>
> - ploop_put_piwb(piwb);
>
> return completed;
> }
> @@ -1309,8 +1332,6 @@ static void ploop_check_standby_mode(struct ploop *ploop, long res)
>
> static void ploop_data_rw_complete(struct pio *pio)
> {
> - bool completed;
> -
> if (pio->ret != pio->bi_iter.bi_size) {
> if (pio->ret >= 0 || pio->ret == -ENOTBLK) {
> /* Partial IO or request to retry in buffered mode */
> @@ -1338,12 +1359,10 @@ static void ploop_data_rw_complete(struct pio *pio)
> pio->bi_status = errno_to_blk_status(pio->ret);
> }
> if (pio->is_data_alloc) {
> - completed = ploop_data_pio_end(pio);
> - if (!completed)
> - return;
> + ploop_data_pio_end(pio);
> + } else {
> + ploop_pio_endio(pio);
> }
> -
> - ploop_pio_endio(pio);
> }
>
> /*
> diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
> index 5ad2a5142521..cc66f9dcd27f 100644
> --- a/drivers/md/dm-ploop.h
> +++ b/drivers/md/dm-ploop.h
> @@ -111,6 +111,7 @@ struct ploop_index_wb {
> u32 page_id;
> struct bio_vec aux_bvec;
> struct pio *flush_pio;
> + int wait_for_last;
> };
>
> /* Metadata page */
More information about the Devel
mailing list