[Devel] [PATCH RH8 3/3] ploop: Rework merge

Kirill Tkhai ktkhai at virtuozzo.com
Wed Jun 23 18:12:45 MSK 2021


pio should be main entity of all driver, and
waiting locked cluster should be made via pio
postponeing. So, we rework merge to fit that.

Signed-off-by: Kirill Tkhai <ktkhai at virtuozzo.com>
---
 drivers/md/dm-ploop-cmd.c    |  200 ++++++++++++------------------------------
 drivers/md/dm-ploop-map.c    |   20 +++-
 drivers/md/dm-ploop-target.c |    5 +
 drivers/md/dm-ploop.h        |   32 ++++---
 4 files changed, 92 insertions(+), 165 deletions(-)

diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c
index b36bb158a3ac..401380de25db 100644
--- a/drivers/md/dm-ploop-cmd.c
+++ b/drivers/md/dm-ploop-cmd.c
@@ -3,21 +3,11 @@
 #include <linux/uio.h>
 #include <linux/ctype.h>
 #include <linux/umh.h>
+#include <linux/sched/signal.h>
 #include "dm-ploop.h"
 
 #define DM_MSG_PREFIX "ploop"
 
-static void ploop_queue_deferred_cmd(struct ploop *ploop, struct ploop_cmd *cmd)
-{
-	unsigned long flags;
-
-	spin_lock_irqsave(&ploop->deferred_lock, flags);
-	BUG_ON(ploop->deferred_cmd && ploop->deferred_cmd != cmd);
-	ploop->deferred_cmd = cmd;
-	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
-	queue_work(ploop->wq, &ploop->worker);
-}
-
 /*
  * Assign newly allocated memory for BAT array and holes_bitmap
  * before grow.
@@ -557,8 +547,6 @@ static int ploop_resize(struct ploop *ploop, sector_t new_sectors)
 	cmd.resize.hb_nr = hb_nr;
 	cmd.resize.new_sectors = new_sectors;
 	cmd.resize.md0 = md0;
-	cmd.retval = 0;
-	cmd.ploop = ploop;
 
 	ploop_suspend_submitting_pios(ploop);
 	ret = process_resize_cmd(ploop, &cmd);
@@ -570,106 +558,75 @@ static int ploop_resize(struct ploop *ploop, sector_t new_sectors)
 	free_md_pages_tree(&cmd.resize.md_pages_root);
 	return ret;
 }
-
-static void ploop_queue_deferred_cmd_wrapper(struct ploop *ploop,
-					     int ret, void *data)
+static void service_pio_endio(struct pio *pio, void *data, blk_status_t status)
 {
-	struct ploop_cmd *cmd = data;
-
-	if (ret) {
-		/* kwork will see this at next time it is on cpu */
-		WRITE_ONCE(cmd->retval, ret);
-	}
-	atomic_inc(&cmd->merge.nr_available);
-	ploop_queue_deferred_cmd(cmd->ploop, cmd);
-}
-
-/* Find mergeable cluster and return it in cmd->merge.cluster */
-static bool iter_delta_clusters(struct ploop *ploop, struct ploop_cmd *cmd)
-{
-	unsigned int dst_cluster, *cluster = &cmd->merge.cluster;
-	u8 level;
-	bool skip;
-
-	BUG_ON(cmd->type != PLOOP_CMD_MERGE_SNAPSHOT);
-
-	for (; *cluster < ploop->nr_bat_entries; ++*cluster) {
-		/*
-		 * Check *cluster is provided by the merged delta.
-		 * We are in kwork, so bat_rwlock is not needed
-		 * (see comment in process_one_deferred_bio()).
-		 */
-		/* FIXME: Optimize this. ploop_bat_entries() is overkill */
-		dst_cluster = ploop_bat_entries(ploop, *cluster, &level);
-		if (dst_cluster == BAT_ENTRY_NONE ||
-		    level != ploop->nr_deltas - 2)
-			continue;
-
-		spin_lock_irq(&ploop->deferred_lock);
-		skip = find_lk_of_cluster(ploop, *cluster);
-		spin_unlock_irq(&ploop->deferred_lock);
-		if (skip) {
-			/*
-			 * Cluster is locked (maybe, under COW).
-			 * Skip it and try to repeat later.
-			 */
-			cmd->merge.do_repeat = true;
-			continue;
-		}
+	struct ploop *ploop = pio->ploop;
+	blk_status_t *status_ptr = data;
+	unsigned long flags;
 
-		return true;
+	if (unlikely(status)) {
+		spin_lock_irqsave(&ploop->err_status_lock, flags);
+		*status_ptr = status;
+		spin_unlock_irqrestore(&ploop->err_status_lock, flags);
 	}
 
-	return false;
+	if (atomic_dec_return(&ploop->service_pios) < MERGE_PIOS_MAX / 2)
+		wake_up(&ploop->service_wq);
 }
 
-static void process_merge_latest_snapshot_cmd(struct ploop *ploop,
-					      struct ploop_cmd *cmd)
+static int process_merge_latest_snapshot(struct ploop *ploop)
 {
-	unsigned int dst_cluster, *cluster = &cmd->merge.cluster;
-	u8 level;
-
-	if (cmd->retval)
-		goto out;
-
-	while (iter_delta_clusters(ploop, cmd)) {
-		/*
-		 * We are in kwork, so bat_rwlock is not needed
-		 * (we can't race with changing BAT, since cmds
-		 *  are processed before bios and piwb is sync).
-		 */
-		/* FIXME: Optimize this: ploop_bat_entries() is overkill */
-		dst_cluster = ploop_bat_entries(ploop, *cluster, &level);
+	static blk_status_t service_status;
+	struct bio_vec bvec = {0};
+	struct pio *pio;
+	int ret = 0;
+	u32 clu;
 
-		/* Check we can submit one more cow in parallel */
-		if (!atomic_add_unless(&cmd->merge.nr_available, -1, 0))
-			return;
-		/*
-		 * This adds cluster lk. Further write bios to *cluster will go
-		 * from ploop_map to kwork (because bat_levels[*cluster] is not
-		 * top_level()), so they will see the lk.
-		 */
-		if (submit_cluster_cow(ploop, level, *cluster, dst_cluster,
-				    ploop_queue_deferred_cmd_wrapper, cmd)) {
-			atomic_inc(&cmd->merge.nr_available);
-			cmd->retval = -ENOMEM;
-			goto out;
+	for (clu = 0; clu < ploop->nr_bat_entries; clu++) {
+		if (fatal_signal_pending(current)) {
+			ret = -EINTR;
+			break;
+		}
+		pio = kmalloc(sizeof(*pio), GFP_KERNEL);
+		if (!pio) {
+			ret = -ENOMEM;
+			break;
+		}
+		init_pio(ploop, REQ_OP_WRITE, pio);
+		pio->free_on_endio = true;
+		pio->bi_io_vec = &bvec;
+		pio->bi_iter.bi_sector = CLU_TO_SEC(ploop, clu);
+		pio->bi_iter.bi_size = 0;
+		pio->bi_iter.bi_idx = 0;
+		pio->bi_iter.bi_bvec_done = 0;
+		pio->endio_cb = service_pio_endio;
+		pio->endio_cb_data = &service_status;
+		pio->is_fake_merge = true;
+		WARN_ON_ONCE(!fake_merge_pio(pio));
+
+		defer_pios(ploop, pio, NULL);
+
+		if (atomic_inc_return(&ploop->service_pios) == MERGE_PIOS_MAX) {
+			wait_event(ploop->service_wq,
+					atomic_read(&ploop->service_pios) < MERGE_PIOS_MAX);
 		}
 
-		++*cluster;
+		if (unlikely(READ_ONCE(service_status)))
+			break;
 	}
-out:
-	if (atomic_read(&cmd->merge.nr_available) != NR_MERGE_BIOS) {
-		/* Wait till last COW queues us */
-		return;
+
+	wait_event(ploop->service_wq, !atomic_read(&ploop->service_pios));
+	if (!ret) {
+		spin_lock_irq(&ploop->err_status_lock);
+		ret = blk_status_to_errno(service_status);
+		spin_unlock_irq(&ploop->err_status_lock);
 	}
 
-	complete(&cmd->comp); /* Last touch of cmd memory */
+	return ret;
 }
 
 static int ploop_merge_latest_snapshot(struct ploop *ploop)
 {
-	struct ploop_cmd cmd;
 	struct file *file;
 	u8 level;
 	int ret;
@@ -680,33 +637,14 @@ static int ploop_merge_latest_snapshot(struct ploop *ploop)
 		return -EROFS;
 	if (ploop->nr_deltas < 2)
 		return -ENOENT;
-again:
-	memset(&cmd, 0, sizeof(cmd));
-	cmd.type = PLOOP_CMD_MERGE_SNAPSHOT;
-	cmd.ploop = ploop;
-	atomic_set(&cmd.merge.nr_available, NR_MERGE_BIOS);
-
-	init_completion(&cmd.comp);
-	ploop_queue_deferred_cmd(ploop, &cmd);
-	ret = wait_for_completion_interruptible(&cmd.comp);
-	if (ret) {
-		/*
-		 * process_merge_latest_snapshot_cmd() will see this
-		 * later or earlier. Take a lock if you want earlier.
-		 */
-		WRITE_ONCE(cmd.retval, -EINTR);
-		wait_for_completion(&cmd.comp);
-	}
 
-	if (cmd.retval)
+	ret = process_merge_latest_snapshot(ploop);
+	if (ret)
 		goto out;
 
-	if (cmd.merge.do_repeat)
-		goto again;
-
 	/* Delta merged. Release delta's file */
-	cmd.retval = ploop_suspend_submitting_pios(ploop);
-	if (cmd.retval)
+	ret = ploop_suspend_submitting_pios(ploop);
+	if (ret)
 		goto out;
 
 	write_lock_irq(&ploop->bat_rwlock);
@@ -719,7 +657,7 @@ static int ploop_merge_latest_snapshot(struct ploop *ploop)
 
 	ploop_resume_submitting_pios(ploop);
 out:
-	return cmd.retval;
+	return ret;
 }
 
 static void notify_delta_merged(struct ploop *ploop, u8 level,
@@ -1154,28 +1092,6 @@ static int ploop_flip_upper_deltas(struct ploop *ploop)
 	return process_flip_upper_deltas(ploop);
 }
 
-/* Handle user commands requested via "message" interface */
-void process_deferred_cmd(struct ploop *ploop)
-	__releases(&ploop->deferred_lock)
-	__acquires(&ploop->deferred_lock)
-{
-	struct ploop_cmd *cmd = ploop->deferred_cmd;
-
-	if (likely(!cmd))
-		return;
-
-	ploop->deferred_cmd = NULL;
-	spin_unlock_irq(&ploop->deferred_lock);
-
-	if (cmd->type == PLOOP_CMD_MERGE_SNAPSHOT) {
-		process_merge_latest_snapshot_cmd(ploop, cmd);
-	} else {
-		cmd->retval = -EINVAL;
-		complete(&cmd->comp);
-	}
-	spin_lock_irq(&ploop->deferred_lock);
-}
-
 static int ploop_get_event(struct ploop *ploop, char *result, unsigned int maxlen)
 {
 	unsigned int sz = 0;
diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index bcdc63a1d5c9..dc2268670f70 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -70,6 +70,7 @@ void init_pio(struct ploop *ploop, unsigned int bi_op, struct pio *pio)
 	pio->bi_op = bi_op;
 	pio->wants_discard_index_cleanup = false;
 	pio->is_data_alloc = false;
+	pio->is_fake_merge = false;
 	pio->free_on_endio = false;
 	pio->ref_index = PLOOP_REF_INDEX_INVALID;
 	pio->bi_status = BLK_STS_OK;
@@ -478,6 +479,14 @@ static bool pio_endio_if_all_zeros(struct pio *pio)
 	return true;
 }
 
+static bool pio_endio_if_merge_fake_pio(struct pio *pio)
+{
+	if (likely(!fake_merge_pio(pio)))
+		return false;
+	pio_endio(pio);
+	return true;
+}
+
 static int punch_hole(struct file *file, loff_t pos, loff_t len)
 {
 	return vfs_fallocate(file, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
@@ -1136,9 +1145,9 @@ static bool postpone_if_cluster_locked(struct ploop *ploop, struct pio *pio,
 	return e_h != NULL;
 }
 
-int submit_cluster_cow(struct ploop *ploop, unsigned int level,
-		       unsigned int cluster, unsigned int dst_cluster,
-		       void (*end_fn)(struct ploop *, int, void *), void *data)
+static int submit_cluster_cow(struct ploop *ploop, unsigned int level,
+			      unsigned int cluster, unsigned int dst_cluster,
+			      void (*end_fn)(struct ploop *, int, void *), void *data)
 {
 	struct ploop_cow *cow = NULL;
 	struct pio *pio = NULL;
@@ -1392,6 +1401,8 @@ static int process_one_deferred_bio(struct ploop *ploop, struct pio *pio,
 
 	if (cluster_is_in_top_delta(ploop, cluster)) {
 		/* Already mapped */
+		if (pio_endio_if_merge_fake_pio(pio))
+			goto out;
 		goto queue;
 	} else if (!op_is_write(pio->bi_op)) {
 		/*
@@ -1538,13 +1549,10 @@ void do_ploop_work(struct work_struct *ws)
 	 *
 	 * Currenly, it's impossible to submit two bat pages update
 	 * in parallel, since the update uses global ploop->bat_page.
-	 * Note, that process_deferred_cmd() expects there is no
-	 * pending index wb.
 	 */
 	ploop_index_wb_init(&piwb, ploop);
 
 	spin_lock_irq(&ploop->deferred_lock);
-	process_deferred_cmd(ploop);
 	process_delta_wb(ploop, &piwb);
 
 	list_splice_init(&ploop->deferred_pios, &deferred_pios);
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
index 76f66fe11de1..3e05895d1cfe 100644
--- a/drivers/md/dm-ploop-target.c
+++ b/drivers/md/dm-ploop-target.c
@@ -123,10 +123,9 @@ void free_md_pages_tree(struct rb_root *root)
 
 static bool ploop_has_pending_activity(struct ploop *ploop)
 {
-	bool has;
+	bool has = false;
 
 	spin_lock_irq(&ploop->deferred_lock);
-	has = ploop->deferred_cmd;
 	has |= !list_empty(&ploop->deferred_pios);
 	has |= !list_empty(&ploop->discard_pios);
 	has |= !list_empty(&ploop->delta_cow_action_list);
@@ -312,7 +311,9 @@ static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 	}
 
 	rwlock_init(&ploop->bat_rwlock);
+	spin_lock_init(&ploop->err_status_lock);
 	init_rwsem(&ploop->ctl_rwsem);
+	init_waitqueue_head(&ploop->service_wq);
 	spin_lock_init(&ploop->inflight_lock);
 	spin_lock_init(&ploop->deferred_lock);
 
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 2a474e5d3cb6..a2d6866d99a5 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -40,12 +40,10 @@ struct ploop_delta {
 	bool is_raw;
 };
 
+#define MERGE_PIOS_MAX			64
+
 struct ploop_cmd {
-#define PLOOP_CMD_MERGE_SNAPSHOT	3
 	struct completion comp;
-	struct ploop *ploop;
-	unsigned int type;
-	int retval;
 	union {
 		struct {
 			sector_t new_sectors;
@@ -62,12 +60,6 @@ struct ploop_cmd {
 			unsigned int cluster, dst_cluster;
 			struct pio *pio;
 		} resize;
-		struct {
-#define NR_MERGE_BIOS			64
-			atomic_t nr_available;
-			unsigned int cluster; /* Currently iterated cluster */
-			bool do_repeat;
-		} merge;
 	};
 };
 
@@ -173,8 +165,11 @@ struct ploop {
 	struct list_head resubmit_pios; /* After partial IO */
 	struct list_head enospc_pios; /* Delayed after ENOSPC */
 
+	atomic_t service_pios;
+	struct wait_queue_head service_wq;
+
+	spinlock_t err_status_lock;
 	struct rw_semaphore ctl_rwsem;
-	struct ploop_cmd *deferred_cmd;
 
 	/*
 	 * List of locked clusters (no write is possible).
@@ -230,6 +225,7 @@ struct pio {
 
 	bool is_data_alloc:1;
 	bool wants_discard_index_cleanup:1;
+	bool is_fake_merge:1;
 	bool free_on_endio:1;
 	/*
 	 * 0 and 1 are related to inflight_bios_ref[],
@@ -486,6 +482,16 @@ static inline struct hlist_head *ploop_htable_slot(struct hlist_head head[], u32
 	return &head[hash_32(clu, PLOOP_HASH_TABLE_BITS)];
 }
 
+static inline bool fake_merge_pio(struct pio *pio)
+{
+	if (pio->is_fake_merge) {
+		WARN_ON_ONCE(pio->bi_iter.bi_size ||
+			     pio->bi_op != REQ_OP_WRITE);
+		return true;
+	}
+	return false;
+}
+
 extern void md_page_insert(struct ploop *ploop, struct md_page *md);
 extern void ploop_free_md_page(struct md_page *md);
 extern void free_md_pages_tree(struct rb_root *root);
@@ -499,7 +505,6 @@ extern void defer_pios(struct ploop *ploop, struct pio *pio, struct list_head *p
 extern void do_ploop_work(struct work_struct *ws);
 extern void do_ploop_fsync_work(struct work_struct *ws);
 extern void ploop_event_work(struct work_struct *work);
-extern void process_deferred_cmd(struct ploop *ploop);
 extern int ploop_clone_and_map(struct dm_target *ti, struct request *rq,
 		    union map_info *map_context, struct request **clone);
 extern struct pio *find_lk_of_cluster(struct ploop *ploop, u32 cluster);
@@ -514,9 +519,6 @@ extern void ploop_reset_bat_update(struct ploop_index_wb *);
 extern void ploop_submit_index_wb_sync(struct ploop *, struct ploop_index_wb *);
 extern int ploop_message(struct dm_target *ti, unsigned int argc, char **argv,
 			 char *result, unsigned int maxlen);
-extern int submit_cluster_cow(struct ploop *ploop, unsigned int level,
-			      unsigned int cluster, unsigned int dst_cluster,
-			      void (*end_fn)(struct ploop *, int, void *), void *data);
 
 extern struct pio * alloc_pio_with_pages(struct ploop *ploop);
 extern void free_pio_with_pages(struct ploop *ploop, struct pio *pio);




More information about the Devel mailing list