[Devel] [PATCH vz9 v1 55/63] dm-ploop: rework bat completion logic

Alexander Atanasov alexander.atanasov at virtuozzo.com
Fri Jan 24 18:36:29 MSK 2025


a pio may complete after md page update, in that case we
must not complete the update but wait for the last data pio
and only then complete them all.

https://virtuozzo.atlassian.net/browse/VSTOR-91821
Signed-off-by: Alexander Atanasov <alexander.atanasov at virtuozzo.com>
---
 drivers/md/dm-ploop-map.c | 105 ++++++++++++++++++++------------------
 1 file changed, 54 insertions(+), 51 deletions(-)

diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index 7d5e2cf443ee..35d12fd54050 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -21,6 +21,8 @@
 #include "dm-rq.h"
 
 static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio);
+static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
+				     blk_status_t bi_status);
 
 #define PREALLOC_SIZE (128ULL * 1024 * 1024)
 
@@ -900,6 +902,7 @@ static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
 	if (wait_llist_pending) {
 		llist_for_each_safe(pos, t, wait_llist_pending) {
 			pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+			INIT_LIST_HEAD(&pio->list);
 			list_add(&pio->list, &list);
 		}
 	}
@@ -915,28 +918,31 @@ static void ploop_free_piwb(struct ploop_index_wb *piwb)
 	kfree(piwb);
 }
 
+
+static void ploop_bat_write_finish(struct pio *pio, void *piwb_ptr,
+				     blk_status_t bi_status);
 static void ploop_put_piwb(struct ploop_index_wb *piwb)
 {
 	if (atomic_dec_and_test(&piwb->count)) {
-		struct ploop *ploop = piwb->ploop;
-		/*
-		 * Index wb failed. Mark clusters as unallocated again.
-		 * piwb->count is zero, so all data writers compeleted.
-		 */
-		if (piwb->bi_status)
-			ploop_advance_local_after_bat_wb(ploop, piwb, false);
+
+		ploop_bat_write_finish(piwb->pio, piwb, piwb->bi_status);
 
 		if (piwb->comp) {
 			if (piwb->comp_bi_status)
 				*piwb->comp_bi_status = piwb->bi_status;
 			complete(piwb->comp);
 		}
+		/*
+		 * Status is set from first call to ploop_bat_write_complete
+		 * zero keeps it as is
+		 */
+
 		ploop_free_piwb(piwb);
 	}
 }
 
 /* This handler is called after BAT is updated. */
-static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
+static void ploop_bat_write_finish(struct pio *pio, void *piwb_ptr,
 				     blk_status_t bi_status)
 {
 	struct ploop_index_wb *piwb = piwb_ptr;
@@ -950,30 +956,30 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
 	struct llist_node *pos, *t;
 	struct llist_node *ll_cow_pios;
 	struct llist_node *ll_ready_pios;
-	int completed = atomic_read(&piwb->count) == 1;
-
-	if (completed) {
-		/* We are the last count so it is safe to advance bat */
-		if (!bi_status) {
-			/*
-			 * Success: now update local BAT copy. We could do this
-			 * from our delayed work, but we want to publish new
-			 * mapping in the fastest way. This must be done before
-			 * data bios completion, since right after we complete
-			 * a bio, subsequent read wants to see written data
-			 * (ploop_map() wants to see not zero bat_entries[.]).
-			 */
-			ploop_advance_local_after_bat_wb(ploop, piwb, true);
-		}
+
+	if (!bi_status) {
+		/*
+		 * Success: now update local BAT copy. We could do this
+		 * from our delayed work, but we want to publish new
+		 * mapping in the fastest way. This must be done before
+		 * data bios completion, since right after we complete
+		 * a bio, subsequent read wants to see written data
+		 * (ploop_map() wants to see not zero bat_entries[.]).
+		 */
+		ploop_advance_local_after_bat_wb(ploop, piwb, true);
+	} else {
+		/*
+		 * Index wb failed. Mark clusters as unallocated again.
+		 * piwb->count is zero, so all data writers compeleted.
+		 */
+		ploop_advance_local_after_bat_wb(ploop, piwb, false);
 	}
 
 	spin_lock_irqsave(&piwb->lock, flags);
-	if (completed)
-		piwb->completed = completed;
-	piwb->bi_status = bi_status;
-	ll_ready_pios = llist_reverse_order(llist_del_all(&piwb->llready_data_pios));
+	ll_ready_pios = llist_del_all(&piwb->llready_data_pios);
+	if (bi_status)
+		piwb->bi_status = bi_status;
 	spin_unlock_irqrestore(&piwb->lock, flags);
-
 	ll_cow_pios = llist_reverse_order(llist_del_all(&piwb->cow_llist));
 
 	/*
@@ -981,8 +987,8 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
 	 */
 
 	llist_for_each_safe(pos, t, ll_ready_pios) {
-		pio = list_entry((struct list_head *)pos, typeof(*aux_pio), list);
-		INIT_LIST_HEAD(&pio->list);
+		data_pio = list_entry((struct list_head *)pos, typeof(*data_pio), list);
+		INIT_LIST_HEAD(&data_pio->list);
 		if (bi_status)
 			data_pio->bi_status = bi_status;
 		ploop_pio_endio(data_pio);
@@ -1000,11 +1006,17 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
 			ploop_dispatch_pios(ploop, flush_pio, NULL);
 		piwb->flush_pio = NULL;
 	}
+}
+
+static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
+				     blk_status_t bi_status)
+
+{
+	struct ploop_index_wb *piwb = piwb_ptr;
+
+	if (bi_status)
+		piwb->bi_status = bi_status;
 
-	/*
-	 * In case of update BAT is failed, dst_clusters will be
-	 * set back to holes_bitmap on last put_piwb().
-	 */
 	ploop_put_piwb(piwb);
 }
 
@@ -1032,7 +1044,6 @@ static int ploop_prepare_bat_update(struct ploop *ploop, struct md_page *md,
 	if (!page || !pio)
 		goto err;
 	ploop_init_pio(ploop, REQ_OP_WRITE, pio);
-
 	bat_entries = md->kmpage;
 
 	spin_lock(&md->md_lock); /* write */
@@ -1260,23 +1271,19 @@ static int ploop_alloc_cluster(struct ploop *ploop, struct ploop_index_wb *piwb,
 	return ret;
 }
 
-static bool ploop_data_pio_end(struct pio *pio)
+static void ploop_data_pio_end(struct pio *pio)
 {
 	struct ploop_index_wb *piwb = pio->piwb;
 	unsigned long flags;
-	bool completed;
 
 	spin_lock_irqsave(&piwb->lock, flags);
-	completed = piwb->completed;
-	if (!completed)
-		llist_add((struct llist_node *)(&pio->list), &piwb->llready_data_pios);
-	else if (!pio->bi_status)
+	llist_add((struct llist_node *)(&pio->list), &piwb->llready_data_pios);
+	if (!pio->bi_status)
 		pio->bi_status = piwb->bi_status;
-	spin_unlock_irqrestore(&piwb->lock, flags);
 
+	/* If pio is late then end it here. this can happen with flushes */
+	spin_unlock_irqrestore(&piwb->lock, flags);
 	ploop_put_piwb(piwb);
-
-	return completed;
 }
 
 static void ploop_attach_end_action(struct pio *pio, struct ploop_index_wb *piwb)
@@ -1320,8 +1327,6 @@ static void ploop_check_standby_mode(struct ploop *ploop, long res)
 
 static void ploop_data_rw_complete(struct pio *pio)
 {
-	bool completed;
-
 	if (pio->ret != pio->bi_iter.bi_size) {
 		if (pio->ret >= 0 || pio->ret == -ENOTBLK) {
 			/* Partial IO or request to retry in buffered mode */
@@ -1356,12 +1361,10 @@ static void ploop_data_rw_complete(struct pio *pio)
 	}
 check_da:
 	if (pio->is_data_alloc) {
-		completed = ploop_data_pio_end(pio);
-		if (!completed)
-			return;
+		ploop_data_pio_end(pio);
+	} else {
+		ploop_pio_endio(pio);
 	}
-
-	ploop_pio_endio(pio);
 }
 
 /*
-- 
2.43.0



More information about the Devel mailing list