[Devel] [PATCH RH8 3/3] ploop: Rework merge
Kirill Tkhai
ktkhai at virtuozzo.com
Wed Jun 23 18:12:45 MSK 2021
pio should be main entity of all driver, and
waiting locked cluster should be made via pio
postponeing. So, we rework merge to fit that.
Signed-off-by: Kirill Tkhai <ktkhai at virtuozzo.com>
---
drivers/md/dm-ploop-cmd.c | 200 ++++++++++++------------------------------
drivers/md/dm-ploop-map.c | 20 +++-
drivers/md/dm-ploop-target.c | 5 +
drivers/md/dm-ploop.h | 32 ++++---
4 files changed, 92 insertions(+), 165 deletions(-)
diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c
index b36bb158a3ac..401380de25db 100644
--- a/drivers/md/dm-ploop-cmd.c
+++ b/drivers/md/dm-ploop-cmd.c
@@ -3,21 +3,11 @@
#include <linux/uio.h>
#include <linux/ctype.h>
#include <linux/umh.h>
+#include <linux/sched/signal.h>
#include "dm-ploop.h"
#define DM_MSG_PREFIX "ploop"
-static void ploop_queue_deferred_cmd(struct ploop *ploop, struct ploop_cmd *cmd)
-{
- unsigned long flags;
-
- spin_lock_irqsave(&ploop->deferred_lock, flags);
- BUG_ON(ploop->deferred_cmd && ploop->deferred_cmd != cmd);
- ploop->deferred_cmd = cmd;
- spin_unlock_irqrestore(&ploop->deferred_lock, flags);
- queue_work(ploop->wq, &ploop->worker);
-}
-
/*
* Assign newly allocated memory for BAT array and holes_bitmap
* before grow.
@@ -557,8 +547,6 @@ static int ploop_resize(struct ploop *ploop, sector_t new_sectors)
cmd.resize.hb_nr = hb_nr;
cmd.resize.new_sectors = new_sectors;
cmd.resize.md0 = md0;
- cmd.retval = 0;
- cmd.ploop = ploop;
ploop_suspend_submitting_pios(ploop);
ret = process_resize_cmd(ploop, &cmd);
@@ -570,106 +558,75 @@ static int ploop_resize(struct ploop *ploop, sector_t new_sectors)
free_md_pages_tree(&cmd.resize.md_pages_root);
return ret;
}
-
-static void ploop_queue_deferred_cmd_wrapper(struct ploop *ploop,
- int ret, void *data)
+static void service_pio_endio(struct pio *pio, void *data, blk_status_t status)
{
- struct ploop_cmd *cmd = data;
-
- if (ret) {
- /* kwork will see this at next time it is on cpu */
- WRITE_ONCE(cmd->retval, ret);
- }
- atomic_inc(&cmd->merge.nr_available);
- ploop_queue_deferred_cmd(cmd->ploop, cmd);
-}
-
-/* Find mergeable cluster and return it in cmd->merge.cluster */
-static bool iter_delta_clusters(struct ploop *ploop, struct ploop_cmd *cmd)
-{
- unsigned int dst_cluster, *cluster = &cmd->merge.cluster;
- u8 level;
- bool skip;
-
- BUG_ON(cmd->type != PLOOP_CMD_MERGE_SNAPSHOT);
-
- for (; *cluster < ploop->nr_bat_entries; ++*cluster) {
- /*
- * Check *cluster is provided by the merged delta.
- * We are in kwork, so bat_rwlock is not needed
- * (see comment in process_one_deferred_bio()).
- */
- /* FIXME: Optimize this. ploop_bat_entries() is overkill */
- dst_cluster = ploop_bat_entries(ploop, *cluster, &level);
- if (dst_cluster == BAT_ENTRY_NONE ||
- level != ploop->nr_deltas - 2)
- continue;
-
- spin_lock_irq(&ploop->deferred_lock);
- skip = find_lk_of_cluster(ploop, *cluster);
- spin_unlock_irq(&ploop->deferred_lock);
- if (skip) {
- /*
- * Cluster is locked (maybe, under COW).
- * Skip it and try to repeat later.
- */
- cmd->merge.do_repeat = true;
- continue;
- }
+ struct ploop *ploop = pio->ploop;
+ blk_status_t *status_ptr = data;
+ unsigned long flags;
- return true;
+ if (unlikely(status)) {
+ spin_lock_irqsave(&ploop->err_status_lock, flags);
+ *status_ptr = status;
+ spin_unlock_irqrestore(&ploop->err_status_lock, flags);
}
- return false;
+ if (atomic_dec_return(&ploop->service_pios) < MERGE_PIOS_MAX / 2)
+ wake_up(&ploop->service_wq);
}
-static void process_merge_latest_snapshot_cmd(struct ploop *ploop,
- struct ploop_cmd *cmd)
+static int process_merge_latest_snapshot(struct ploop *ploop)
{
- unsigned int dst_cluster, *cluster = &cmd->merge.cluster;
- u8 level;
-
- if (cmd->retval)
- goto out;
-
- while (iter_delta_clusters(ploop, cmd)) {
- /*
- * We are in kwork, so bat_rwlock is not needed
- * (we can't race with changing BAT, since cmds
- * are processed before bios and piwb is sync).
- */
- /* FIXME: Optimize this: ploop_bat_entries() is overkill */
- dst_cluster = ploop_bat_entries(ploop, *cluster, &level);
+ static blk_status_t service_status;
+ struct bio_vec bvec = {0};
+ struct pio *pio;
+ int ret = 0;
+ u32 clu;
- /* Check we can submit one more cow in parallel */
- if (!atomic_add_unless(&cmd->merge.nr_available, -1, 0))
- return;
- /*
- * This adds cluster lk. Further write bios to *cluster will go
- * from ploop_map to kwork (because bat_levels[*cluster] is not
- * top_level()), so they will see the lk.
- */
- if (submit_cluster_cow(ploop, level, *cluster, dst_cluster,
- ploop_queue_deferred_cmd_wrapper, cmd)) {
- atomic_inc(&cmd->merge.nr_available);
- cmd->retval = -ENOMEM;
- goto out;
+ for (clu = 0; clu < ploop->nr_bat_entries; clu++) {
+ if (fatal_signal_pending(current)) {
+ ret = -EINTR;
+ break;
+ }
+ pio = kmalloc(sizeof(*pio), GFP_KERNEL);
+ if (!pio) {
+ ret = -ENOMEM;
+ break;
+ }
+ init_pio(ploop, REQ_OP_WRITE, pio);
+ pio->free_on_endio = true;
+ pio->bi_io_vec = &bvec;
+ pio->bi_iter.bi_sector = CLU_TO_SEC(ploop, clu);
+ pio->bi_iter.bi_size = 0;
+ pio->bi_iter.bi_idx = 0;
+ pio->bi_iter.bi_bvec_done = 0;
+ pio->endio_cb = service_pio_endio;
+ pio->endio_cb_data = &service_status;
+ pio->is_fake_merge = true;
+ WARN_ON_ONCE(!fake_merge_pio(pio));
+
+ defer_pios(ploop, pio, NULL);
+
+ if (atomic_inc_return(&ploop->service_pios) == MERGE_PIOS_MAX) {
+ wait_event(ploop->service_wq,
+ atomic_read(&ploop->service_pios) < MERGE_PIOS_MAX);
}
- ++*cluster;
+ if (unlikely(READ_ONCE(service_status)))
+ break;
}
-out:
- if (atomic_read(&cmd->merge.nr_available) != NR_MERGE_BIOS) {
- /* Wait till last COW queues us */
- return;
+
+ wait_event(ploop->service_wq, !atomic_read(&ploop->service_pios));
+ if (!ret) {
+ spin_lock_irq(&ploop->err_status_lock);
+ ret = blk_status_to_errno(service_status);
+ spin_unlock_irq(&ploop->err_status_lock);
}
- complete(&cmd->comp); /* Last touch of cmd memory */
+ return ret;
}
static int ploop_merge_latest_snapshot(struct ploop *ploop)
{
- struct ploop_cmd cmd;
struct file *file;
u8 level;
int ret;
@@ -680,33 +637,14 @@ static int ploop_merge_latest_snapshot(struct ploop *ploop)
return -EROFS;
if (ploop->nr_deltas < 2)
return -ENOENT;
-again:
- memset(&cmd, 0, sizeof(cmd));
- cmd.type = PLOOP_CMD_MERGE_SNAPSHOT;
- cmd.ploop = ploop;
- atomic_set(&cmd.merge.nr_available, NR_MERGE_BIOS);
-
- init_completion(&cmd.comp);
- ploop_queue_deferred_cmd(ploop, &cmd);
- ret = wait_for_completion_interruptible(&cmd.comp);
- if (ret) {
- /*
- * process_merge_latest_snapshot_cmd() will see this
- * later or earlier. Take a lock if you want earlier.
- */
- WRITE_ONCE(cmd.retval, -EINTR);
- wait_for_completion(&cmd.comp);
- }
- if (cmd.retval)
+ ret = process_merge_latest_snapshot(ploop);
+ if (ret)
goto out;
- if (cmd.merge.do_repeat)
- goto again;
-
/* Delta merged. Release delta's file */
- cmd.retval = ploop_suspend_submitting_pios(ploop);
- if (cmd.retval)
+ ret = ploop_suspend_submitting_pios(ploop);
+ if (ret)
goto out;
write_lock_irq(&ploop->bat_rwlock);
@@ -719,7 +657,7 @@ static int ploop_merge_latest_snapshot(struct ploop *ploop)
ploop_resume_submitting_pios(ploop);
out:
- return cmd.retval;
+ return ret;
}
static void notify_delta_merged(struct ploop *ploop, u8 level,
@@ -1154,28 +1092,6 @@ static int ploop_flip_upper_deltas(struct ploop *ploop)
return process_flip_upper_deltas(ploop);
}
-/* Handle user commands requested via "message" interface */
-void process_deferred_cmd(struct ploop *ploop)
- __releases(&ploop->deferred_lock)
- __acquires(&ploop->deferred_lock)
-{
- struct ploop_cmd *cmd = ploop->deferred_cmd;
-
- if (likely(!cmd))
- return;
-
- ploop->deferred_cmd = NULL;
- spin_unlock_irq(&ploop->deferred_lock);
-
- if (cmd->type == PLOOP_CMD_MERGE_SNAPSHOT) {
- process_merge_latest_snapshot_cmd(ploop, cmd);
- } else {
- cmd->retval = -EINVAL;
- complete(&cmd->comp);
- }
- spin_lock_irq(&ploop->deferred_lock);
-}
-
static int ploop_get_event(struct ploop *ploop, char *result, unsigned int maxlen)
{
unsigned int sz = 0;
diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index bcdc63a1d5c9..dc2268670f70 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -70,6 +70,7 @@ void init_pio(struct ploop *ploop, unsigned int bi_op, struct pio *pio)
pio->bi_op = bi_op;
pio->wants_discard_index_cleanup = false;
pio->is_data_alloc = false;
+ pio->is_fake_merge = false;
pio->free_on_endio = false;
pio->ref_index = PLOOP_REF_INDEX_INVALID;
pio->bi_status = BLK_STS_OK;
@@ -478,6 +479,14 @@ static bool pio_endio_if_all_zeros(struct pio *pio)
return true;
}
+static bool pio_endio_if_merge_fake_pio(struct pio *pio)
+{
+ if (likely(!fake_merge_pio(pio)))
+ return false;
+ pio_endio(pio);
+ return true;
+}
+
static int punch_hole(struct file *file, loff_t pos, loff_t len)
{
return vfs_fallocate(file, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
@@ -1136,9 +1145,9 @@ static bool postpone_if_cluster_locked(struct ploop *ploop, struct pio *pio,
return e_h != NULL;
}
-int submit_cluster_cow(struct ploop *ploop, unsigned int level,
- unsigned int cluster, unsigned int dst_cluster,
- void (*end_fn)(struct ploop *, int, void *), void *data)
+static int submit_cluster_cow(struct ploop *ploop, unsigned int level,
+ unsigned int cluster, unsigned int dst_cluster,
+ void (*end_fn)(struct ploop *, int, void *), void *data)
{
struct ploop_cow *cow = NULL;
struct pio *pio = NULL;
@@ -1392,6 +1401,8 @@ static int process_one_deferred_bio(struct ploop *ploop, struct pio *pio,
if (cluster_is_in_top_delta(ploop, cluster)) {
/* Already mapped */
+ if (pio_endio_if_merge_fake_pio(pio))
+ goto out;
goto queue;
} else if (!op_is_write(pio->bi_op)) {
/*
@@ -1538,13 +1549,10 @@ void do_ploop_work(struct work_struct *ws)
*
* Currenly, it's impossible to submit two bat pages update
* in parallel, since the update uses global ploop->bat_page.
- * Note, that process_deferred_cmd() expects there is no
- * pending index wb.
*/
ploop_index_wb_init(&piwb, ploop);
spin_lock_irq(&ploop->deferred_lock);
- process_deferred_cmd(ploop);
process_delta_wb(ploop, &piwb);
list_splice_init(&ploop->deferred_pios, &deferred_pios);
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
index 76f66fe11de1..3e05895d1cfe 100644
--- a/drivers/md/dm-ploop-target.c
+++ b/drivers/md/dm-ploop-target.c
@@ -123,10 +123,9 @@ void free_md_pages_tree(struct rb_root *root)
static bool ploop_has_pending_activity(struct ploop *ploop)
{
- bool has;
+ bool has = false;
spin_lock_irq(&ploop->deferred_lock);
- has = ploop->deferred_cmd;
has |= !list_empty(&ploop->deferred_pios);
has |= !list_empty(&ploop->discard_pios);
has |= !list_empty(&ploop->delta_cow_action_list);
@@ -312,7 +311,9 @@ static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
}
rwlock_init(&ploop->bat_rwlock);
+ spin_lock_init(&ploop->err_status_lock);
init_rwsem(&ploop->ctl_rwsem);
+ init_waitqueue_head(&ploop->service_wq);
spin_lock_init(&ploop->inflight_lock);
spin_lock_init(&ploop->deferred_lock);
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 2a474e5d3cb6..a2d6866d99a5 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -40,12 +40,10 @@ struct ploop_delta {
bool is_raw;
};
+#define MERGE_PIOS_MAX 64
+
struct ploop_cmd {
-#define PLOOP_CMD_MERGE_SNAPSHOT 3
struct completion comp;
- struct ploop *ploop;
- unsigned int type;
- int retval;
union {
struct {
sector_t new_sectors;
@@ -62,12 +60,6 @@ struct ploop_cmd {
unsigned int cluster, dst_cluster;
struct pio *pio;
} resize;
- struct {
-#define NR_MERGE_BIOS 64
- atomic_t nr_available;
- unsigned int cluster; /* Currently iterated cluster */
- bool do_repeat;
- } merge;
};
};
@@ -173,8 +165,11 @@ struct ploop {
struct list_head resubmit_pios; /* After partial IO */
struct list_head enospc_pios; /* Delayed after ENOSPC */
+ atomic_t service_pios;
+ struct wait_queue_head service_wq;
+
+ spinlock_t err_status_lock;
struct rw_semaphore ctl_rwsem;
- struct ploop_cmd *deferred_cmd;
/*
* List of locked clusters (no write is possible).
@@ -230,6 +225,7 @@ struct pio {
bool is_data_alloc:1;
bool wants_discard_index_cleanup:1;
+ bool is_fake_merge:1;
bool free_on_endio:1;
/*
* 0 and 1 are related to inflight_bios_ref[],
@@ -486,6 +482,16 @@ static inline struct hlist_head *ploop_htable_slot(struct hlist_head head[], u32
return &head[hash_32(clu, PLOOP_HASH_TABLE_BITS)];
}
+static inline bool fake_merge_pio(struct pio *pio)
+{
+ if (pio->is_fake_merge) {
+ WARN_ON_ONCE(pio->bi_iter.bi_size ||
+ pio->bi_op != REQ_OP_WRITE);
+ return true;
+ }
+ return false;
+}
+
extern void md_page_insert(struct ploop *ploop, struct md_page *md);
extern void ploop_free_md_page(struct md_page *md);
extern void free_md_pages_tree(struct rb_root *root);
@@ -499,7 +505,6 @@ extern void defer_pios(struct ploop *ploop, struct pio *pio, struct list_head *p
extern void do_ploop_work(struct work_struct *ws);
extern void do_ploop_fsync_work(struct work_struct *ws);
extern void ploop_event_work(struct work_struct *work);
-extern void process_deferred_cmd(struct ploop *ploop);
extern int ploop_clone_and_map(struct dm_target *ti, struct request *rq,
union map_info *map_context, struct request **clone);
extern struct pio *find_lk_of_cluster(struct ploop *ploop, u32 cluster);
@@ -514,9 +519,6 @@ extern void ploop_reset_bat_update(struct ploop_index_wb *);
extern void ploop_submit_index_wb_sync(struct ploop *, struct ploop_index_wb *);
extern int ploop_message(struct dm_target *ti, unsigned int argc, char **argv,
char *result, unsigned int maxlen);
-extern int submit_cluster_cow(struct ploop *ploop, unsigned int level,
- unsigned int cluster, unsigned int dst_cluster,
- void (*end_fn)(struct ploop *, int, void *), void *data);
extern struct pio * alloc_pio_with_pages(struct ploop *ploop);
extern void free_pio_with_pages(struct ploop *ploop, struct pio *pio);
More information about the Devel
mailing list