[Devel] [RFC PATCH vz9 v6 61/62] dm-ploop: rework bat completion logic

Alexander Atanasov alexander.atanasov at virtuozzo.com
Fri Dec 20 17:55:34 MSK 2024


On 20.12.24 16:23, Andrey Zhadchenko wrote:
> 
> 
> On 12/5/24 22:56, Alexander Atanasov wrote:
>> a pio may complete after md page update, in that case we
>> must not complete the update but wait for the last data pio
>> and only then complete them all.
>>
>> https://virtuozzo.atlassian.net/browse/VSTOR-91821
>> Signed-off-by: Alexander Atanasov <alexander.atanasov at virtuozzo.com>
>> ---
>>   drivers/md/dm-ploop-map.c | 57 ++++++++++++++++++++++++++-------------
>>   drivers/md/dm-ploop.h     |  1 +
>>   2 files changed, 39 insertions(+), 19 deletions(-)
>>
>> diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
>> index 482022d6b60b..9da996935945 100644
>> --- a/drivers/md/dm-ploop-map.c
>> +++ b/drivers/md/dm-ploop-map.c
>> @@ -598,7 +598,6 @@ static void ploop_unlink_completed_pio(struct 
>> ploop *ploop, struct pio *pio)
>>   static bool ploop_md_make_dirty(struct ploop *ploop, struct md_page 
>> *md)
>>   {
>> -    unsigned long flags;
>>       bool new = false;
>>       /*
>> @@ -896,6 +895,7 @@ static void 
>> ploop_advance_local_after_bat_wb(struct ploop *ploop,
>>       if (wait_llist_pending) {
>>           llist_for_each_safe(pos, t, wait_llist_pending) {
>>               pio = list_entry((struct list_head *)pos, typeof(*pio), 
>> list);
>> +            INIT_LIST_HEAD(&pio->list);
>>               list_add(&pio->list, &list);
>>           }
>>       }
>> @@ -962,15 +962,22 @@ static void ploop_bat_write_complete(struct pio 
>> *pio, void *piwb_ptr,
>>                */
>>               ploop_advance_local_after_bat_wb(ploop, piwb, true);
>>           }
>> +    } else {
>> +        PL_ERR("piwb not completed\n");
>>       }
>>       spin_lock_irqsave(&piwb->lock, flags);
>> -    if (completed)
>> +    if (completed) {
>>           piwb->completed = completed;
>> -    piwb->bi_status = bi_status;
>> -    ll_ready_pios = 
>> llist_reverse_order(llist_del_all(&piwb->llready_data_pios));
>> +        ll_ready_pios = llist_del_all(&piwb->llready_data_pios);
>> +    } else {
>> +        piwb->wait_for_last = 1;
>> +    }
>> +    if (bi_status)
>> +        piwb->bi_status = bi_status;
>>       spin_unlock_irqrestore(&piwb->lock, flags);
>> -
>> +    if (!completed)
>> +        return;
>>       ll_cow_pios = llist_reverse_order(llist_del_all(&piwb->cow_llist));
>>       /*
>> @@ -978,8 +985,8 @@ static void ploop_bat_write_complete(struct pio 
>> *pio, void *piwb_ptr,
>>        */
>>       llist_for_each_safe(pos, t, ll_ready_pios) {
>> -        pio = list_entry((struct list_head *)pos, typeof(*aux_pio), 
>> list);
>> -        INIT_LIST_HEAD(&pio->list);
>> +        data_pio = list_entry((struct list_head *)pos, 
>> typeof(*data_pio), list);
>> +        INIT_LIST_HEAD(&data_pio->list);
>>           if (bi_status)
>>               data_pio->bi_status = bi_status;
>>           ploop_pio_endio(data_pio);
>> @@ -1026,9 +1033,10 @@ static int ploop_prepare_bat_update(struct 
>> ploop *ploop, struct md_page *md,
>>       piwb->pio = pio = ploop_alloc_pio(ploop, GFP_NOIO);
>>       if (!page || !pio)
>>           goto err;
>> -    piwb->kmpage = kmap(piwb->bat_page);
>>       ploop_init_pio(ploop, REQ_OP_WRITE, pio);
>> -
>> +    piwb->kmpage = kmap(piwb->bat_page);
>> +    WARN_ON(!piwb->kmpage);
>> +    piwb->wait_for_last = 0;
>>       bat_entries = md->kmpage;
>>       spin_lock_irq(&md->md_lock);
>> @@ -1251,6 +1259,7 @@ static bool ploop_data_pio_end(struct pio *pio)
>>       struct ploop_index_wb *piwb = pio->piwb;
>>       unsigned long flags;
>>       bool completed;
>> +    int last_pio = 0;
>>       spin_lock_irqsave(&piwb->lock, flags);
>>       completed = piwb->completed;
>> @@ -1258,12 +1267,26 @@ static bool ploop_data_pio_end(struct pio *pio)
>>           llist_add((struct llist_node *)(&pio->list), 
>> &piwb->llready_data_pios);
>>       else if (!pio->bi_status)
>>           pio->bi_status = piwb->bi_status;
>> -    spin_unlock_irqrestore(&piwb->lock, flags);
>>       /* If pio is late then end it here. this can happen with flushes */
>> -    if (completed)
>> +    if (atomic_read(&piwb->count) == 2) {
> 
> I think this is really complicated. Why not just call 
> ploop_bat_write_complete() when count reaches zero (and dec count on 
> data pio end or metadata pio end).
> In current scheme I think we will explode if this code is run 
> in-parallel. Imagine md write is complete, but several data pios are 
> ended simultaneously and execute this in-parallel. They check that count 
> is not 2 (for example there is 3 parallel datapio). Now all of them have 
> last_pio == 0. Then all 3 go ploop_put_piwb(). piwb count is now zero 
> and no one has called ploop_bat_write_complete().
> piwb->lock didn't help us since there is still may be parallel execution 
> between unlock and piwb->count dec inside ploop_put_piwb().


Count reaches zero in put piwb and piwb is freed, so you mean count 1.
ploop_bat_write_complete is complete if atomic_read(&piwb->count) == 1;
i.e. it is the last reference to piwb which is dropped on put.

We can not declare bat write complete and end related pios if
all pios are not complete. But we can get a call to 
ploop_bat_write_complete before call to the last call to pio
which case the code tries to catch. What tries to synchronize what you 
talk about is piwb->wait_for_last - it is set and checked under a lock.
It is set only if bat_write_complete is already called. So it needs to 
be called from ploop_data_pio_end. But may be you are right, and as a 
solution i see to check again if  ploop_bat_write_complete needs to be 
called in put_pwib.

I have to think about this.

-- 
Regards,
Alexander Atanasov



More information about the Devel mailing list