[Devel] [PATCH VZ9 v3 06/13] dm-ploop: introduce per-md page locking
Andrey Zhadchenko
andrey.zhadchenko at virtuozzo.com
Thu Oct 24 20:23:34 MSK 2024
Currently we have single bat_rwlock for the whole ploop. However,
runtime locking granularity can be reduced to single metadata page.
In this patch, add rwlock to metadata structure, use it when
accessing md->levels and md->page at the sime time to protect
readers against writers.
https://virtuozzo.atlassian.net/browse/VSTOR-91817
Signed-off-by: Andrey Zhadchenko <andrey.zhadchenko at virtuozzo.com>
---
drivers/md/dm-ploop-bat.c | 14 +++++++-----
drivers/md/dm-ploop-cmd.c | 48 ++++++++++++++++++++++++---------------
drivers/md/dm-ploop-map.c | 16 ++++++-------
drivers/md/dm-ploop.h | 6 +++++
4 files changed, 52 insertions(+), 32 deletions(-)
diff --git a/drivers/md/dm-ploop-bat.c b/drivers/md/dm-ploop-bat.c
index 655d0e4c91ab..a6202720927f 100644
--- a/drivers/md/dm-ploop-bat.c
+++ b/drivers/md/dm-ploop-bat.c
@@ -88,6 +88,7 @@ static struct md_page *ploop_alloc_md_page(u32 id)
md->page = page;
md->kmpage = kmap(page);
md->id = id;
+ rwlock_init(&md->lock);
return md;
err_page:
kfree(levels);
@@ -134,20 +135,23 @@ bool ploop_try_update_bat_entry(struct ploop *ploop, u32 clu, u8 level, u32 dst_
{
u32 *bat_entries, id = ploop_bat_clu_to_page_nr(clu);
struct md_page *md = ploop_md_page_find(ploop, id);
-
- lockdep_assert_held(&ploop->bat_rwlock);
+ unsigned long flags;
+ bool ret = false;
if (!md)
return false;
clu = ploop_bat_clu_idx_in_page(clu); /* relative offset */
+ write_lock_irqsave(&md->lock, flags);
if (READ_ONCE(md->bat_levels[clu]) == level) {
bat_entries = md->kmpage;
WRITE_ONCE(bat_entries[clu], dst_clu);
- return true;
+ ret = true;
}
- return false;
+ write_unlock_irqrestore(&md->lock, flags);
+
+ return ret;
}
/* Alloc holes_bitmap and set bits of free clusters */
@@ -411,7 +415,6 @@ static void ploop_apply_delta_mappings(struct ploop *ploop,
if (!is_raw)
d_md = ploop_md_first_entry(md_root);
- write_lock_irq(&ploop->bat_rwlock);
ploop_for_each_md_page(ploop, md, node) {
bat_entries = md->kmpage;
if (!is_raw)
@@ -455,7 +458,6 @@ static void ploop_apply_delta_mappings(struct ploop *ploop,
if (!is_raw)
d_md = ploop_md_next_entry(d_md);
}
- write_unlock_irq(&ploop->bat_rwlock);
}
int ploop_check_delta_length(struct ploop *ploop, struct file *file, loff_t *file_size)
diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c
index 006495d95db3..5df0c1f22fc5 100644
--- a/drivers/md/dm-ploop-cmd.c
+++ b/drivers/md/dm-ploop-cmd.c
@@ -27,6 +27,7 @@ static void ploop_advance_holes_bitmap(struct ploop *ploop,
u32 i, end, size, dst_clu, *bat_entries;
struct rb_node *node;
struct md_page *md;
+ unsigned long flags;
/* This is called only once */
if (cmd->resize.stage != PLOOP_GROW_STAGE_INITIAL)
@@ -44,6 +45,8 @@ static void ploop_advance_holes_bitmap(struct ploop *ploop,
ploop_for_each_md_page(ploop, md, node) {
ploop_init_be_iter(ploop, md->id, &i, &end);
bat_entries = md->kmpage;
+
+ read_lock_irqsave(&md->lock, flags);
for (; i <= end; i++) {
if (!ploop_md_page_cluster_is_in_top_delta(ploop, md, i))
continue;
@@ -54,6 +57,7 @@ static void ploop_advance_holes_bitmap(struct ploop *ploop,
ploop_hole_clear_bit(dst_clu, ploop);
}
}
+ read_unlock_irqrestore(&md->lock, flags);
}
write_unlock_irq(&ploop->bat_rwlock);
}
@@ -165,11 +169,13 @@ static u32 ploop_find_bat_entry(struct ploop *ploop, u32 dst_clu, bool *is_locke
u32 i, end, *bat_entries, clu = U32_MAX;
struct rb_node *node;
struct md_page *md;
+ unsigned long flags;
- read_lock_irq(&ploop->bat_rwlock);
ploop_for_each_md_page(ploop, md, node) {
ploop_init_be_iter(ploop, md->id, &i, &end);
bat_entries = md->kmpage;
+
+ read_lock_irqsave(&md->lock, flags);
for (; i <= end; i++) {
if (READ_ONCE(bat_entries[i]) != dst_clu)
continue;
@@ -178,10 +184,10 @@ static u32 ploop_find_bat_entry(struct ploop *ploop, u32 dst_clu, bool *is_locke
break;
}
}
+ read_unlock_irqrestore(&md->lock, flags);
if (clu != UINT_MAX)
break;
}
- read_unlock_irq(&ploop->bat_rwlock);
*is_locked = false;
if (clu != UINT_MAX) {
@@ -350,10 +356,8 @@ static int ploop_grow_relocate_cluster(struct ploop *ploop,
}
/* Update local BAT copy */
- write_lock_irq(&ploop->bat_rwlock);
WARN_ON(!ploop_try_update_bat_entry(ploop, clu,
ploop_top_level(ploop), new_dst));
- write_unlock_irq(&ploop->bat_rwlock);
not_occupied:
/*
* Now dst_clu is not referenced in BAT, so increase the value
@@ -703,12 +707,10 @@ static int ploop_merge_latest_snapshot(struct ploop *ploop)
if (ret)
goto out;
- write_lock_irq(&ploop->bat_rwlock);
level = ploop->nr_deltas - 2;
file = ploop->deltas[level].file;
ploop->deltas[level] = ploop->deltas[level + 1];
ploop->nr_deltas--;
- write_unlock_irq(&ploop->bat_rwlock);
fput(file);
ploop_resume_submitting_pios(ploop);
@@ -726,15 +728,19 @@ static void notify_delta_merged(struct ploop *ploop, u8 level,
struct rb_node *node;
struct file *file;
bool stop = false;
+ unsigned long flags;
u32 clu;
d_md = ploop_md_first_entry(md_root);
- write_lock_irq(&ploop->bat_rwlock);
ploop_for_each_md_page(ploop, md, node) {
init_be_iter(nr_be, md->id, &i, &end);
bat_entries = md->kmpage;
d_bat_entries = d_md->kmpage;
+
+ write_lock_irqsave(&md->lock, flags);
+ write_lock(&d_md->lock);
+
for (; i <= end; i++) {
clu = ploop_page_clu_idx_to_bat_clu(md->id, i);
if (clu == nr_be - 1)
@@ -767,6 +773,10 @@ static void notify_delta_merged(struct ploop *ploop, u8 level,
else
WRITE_ONCE(md->bat_levels[i], level);
}
+
+ write_unlock(&d_md->lock);
+ write_unlock_irqrestore(&md->lock, flags);
+
if (stop)
break;
d_md = ploop_md_next_entry(d_md);
@@ -778,7 +788,6 @@ static void notify_delta_merged(struct ploop *ploop, u8 level,
ploop->deltas[i - 1] = ploop->deltas[i];
memset(&ploop->deltas[--ploop->nr_deltas], 0,
sizeof(struct ploop_delta));
- write_unlock_irq(&ploop->bat_rwlock);
fput(file);
}
@@ -789,7 +798,6 @@ static int ploop_process_update_delta_index(struct ploop *ploop, u8 level,
u32 clu, dst_clu, n;
int ret;
- write_lock_irq(&ploop->bat_rwlock);
/* Check all */
while (sscanf(map, "%u:%u;%n", &clu, &dst_clu, &n) == 2) {
/*
@@ -814,7 +822,6 @@ static int ploop_process_update_delta_index(struct ploop *ploop, u8 level,
}
ret = 0;
unlock:
- write_unlock_irq(&ploop->bat_rwlock);
return ret;
}
ALLOW_ERROR_INJECTION(ploop_process_update_delta_index, ERRNO);
@@ -905,12 +912,9 @@ static int ploop_get_delta_name_cmd(struct ploop *ploop, u8 level,
/*
* Nobody can change deltas in parallel, since
- * another cmds are prohibited, but do this
- * for uniformity.
+ * another cmds are prohibited
*/
- read_lock_irq(&ploop->bat_rwlock);
file = get_file(ploop->deltas[level].file);
- read_unlock_irq(&ploop->bat_rwlock);
p = file_path(file, result, maxlen);
if (p == ERR_PTR(-ENAMETOOLONG)) {
@@ -978,7 +982,11 @@ static int process_flip_upper_deltas(struct ploop *ploop)
bat_clusters = DIV_ROUND_UP(size, CLU_SIZE(ploop));
hb_nr = ploop->hb_nr;
- write_lock_irq(&ploop->bat_rwlock);
+ /*
+ * We can be here only if ploop is suspended:
+ * no other IO nor command is possible
+ */
+
/* Prepare holes_bitmap */
memset(holes_bitmap, 0xff, hb_nr/8);
for (i = (hb_nr & ~0x7); i < hb_nr; i++)
@@ -990,6 +998,7 @@ static int process_flip_upper_deltas(struct ploop *ploop)
ploop_for_each_md_page(ploop, md, node) {
ploop_init_be_iter(ploop, md->id, &i, &end);
bat_entries = md->kmpage;
+
for (; i <= end; i++) {
if (READ_ONCE(bat_entries[i]) == BAT_ENTRY_NONE)
continue;
@@ -1004,7 +1013,6 @@ static int process_flip_upper_deltas(struct ploop *ploop)
/* FIXME */
swap(ploop->deltas[level], ploop->deltas[level+1]);
- write_unlock_irq(&ploop->bat_rwlock);
return 0;
}
@@ -1043,6 +1051,7 @@ static int ploop_check_delta_before_flip(struct ploop *ploop, struct file *file)
struct md_page *md, *d_md;
struct rb_node *node;
bool stop = false;
+ unsigned long flags;
ret = ploop_read_delta_metadata(ploop, file, &md_root, &nr_be);
if (ret) {
@@ -1053,10 +1062,12 @@ static int ploop_check_delta_before_flip(struct ploop *ploop, struct file *file)
/* Points to hdr since md_page[0] also contains hdr. */
d_md = ploop_md_first_entry(&md_root);
- write_lock_irq(&ploop->bat_rwlock);
ploop_for_each_md_page(ploop, md, node) {
init_be_iter(nr_be, md->id, &i, &end);
d_bat_entries = d_md->kmpage;
+
+ read_lock_irqsave(&md->lock, flags);
+ read_lock(&d_md->lock);
for (; i <= end; i++) {
if (ploop_md_page_cluster_is_in_top_delta(ploop, md, i) &&
d_bat_entries[i] != BAT_ENTRY_NONE) {
@@ -1065,6 +1076,8 @@ static int ploop_check_delta_before_flip(struct ploop *ploop, struct file *file)
goto unmap;
}
}
+ read_unlock(&d_md->lock);
+ read_unlock_irqrestore(&md->lock, flags);
clu = ploop_page_clu_idx_to_bat_clu(md->id, i);
if (clu == nr_be - 1) {
@@ -1077,7 +1090,6 @@ static int ploop_check_delta_before_flip(struct ploop *ploop, struct file *file)
d_md = ploop_md_next_entry(d_md);
}
- write_unlock_irq(&ploop->bat_rwlock);
ploop_free_md_pages_tree(&md_root);
out:
#endif
diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index 3e774bdafb4a..b160dd00e497 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -373,13 +373,13 @@ static bool ploop_delay_if_md_busy(struct ploop *ploop, struct md_page *md,
WARN_ON_ONCE(!list_empty(&pio->list));
/* lock protects piwb */
- read_lock_irqsave(&ploop->bat_rwlock, flags);
+ read_lock_irqsave(&md->lock, flags);
piwb = md->piwb;
if (piwb && (piwb->type != type || test_bit(MD_WRITEBACK, &md->status))) {
llist_add((struct llist_node *)(&pio->list), &md->wait_llist);
busy = true;
}
- read_unlock_irqrestore(&ploop->bat_rwlock, flags);
+ read_unlock_irqrestore(&md->lock, flags);
return busy;
}
@@ -788,9 +788,9 @@ static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
WARN_ON_ONCE(!test_bit(MD_WRITEBACK, &md->status));
clear_bit(MD_WRITEBACK, &md->status);
/* protect piwb */
- write_lock_irqsave(&ploop->bat_rwlock, flags);
+ write_lock_irqsave(&md->lock, flags);
md->piwb = NULL;
- write_unlock_irqrestore(&ploop->bat_rwlock, flags);
+ write_unlock_irqrestore(&md->lock, flags);
wait_llist_pending = llist_del_all(&md->wait_llist);
if (wait_llist_pending) {
@@ -907,10 +907,10 @@ static int ploop_prepare_bat_update(struct ploop *ploop, struct md_page *md,
bat_entries = md->kmpage;
- write_lock_irq(&ploop->bat_rwlock);
+ write_lock_irq(&md->lock);
md->piwb = piwb;
piwb->md = md;
- write_unlock_irq(&ploop->bat_rwlock);
+ write_unlock_irq(&md->lock);
piwb->page_id = page_id;
to = piwb->kmpage;
@@ -954,10 +954,10 @@ void ploop_break_bat_update(struct ploop *ploop, struct md_page *md)
struct ploop_index_wb *piwb;
unsigned long flags;
- write_lock_irqsave(&ploop->bat_rwlock, flags);
+ write_lock_irqsave(&md->lock, flags);
piwb = md->piwb;
md->piwb = NULL;
- write_unlock_irqrestore(&ploop->bat_rwlock, flags);
+ write_unlock_irqrestore(&md->lock, flags);
ploop_free_piwb(piwb);
}
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index bd4906e4c2b5..98dd472a815e 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -125,6 +125,8 @@ struct md_page {
struct llist_node wb_llink;
struct ploop_index_wb *piwb;
+
+ rwlock_t lock;
};
enum {
@@ -418,6 +420,7 @@ static inline u32 ploop_bat_entries(struct ploop *ploop, u32 clu,
{
u32 *bat_entries, dst_clu, id;
struct md_page *md;
+ unsigned long flags;
id = ploop_bat_clu_to_page_nr(clu);
md = ploop_md_page_find(ploop, id);
@@ -426,6 +429,7 @@ static inline u32 ploop_bat_entries(struct ploop *ploop, u32 clu,
/* Cluster index related to the page[page_id] start */
clu = ploop_bat_clu_idx_in_page(clu);
+ read_lock_irqsave(&md->lock, flags);
if (bat_level)
*bat_level = READ_ONCE(md->bat_levels[clu]);
if (md_ret)
@@ -433,6 +437,8 @@ static inline u32 ploop_bat_entries(struct ploop *ploop, u32 clu,
bat_entries = md->kmpage;
dst_clu = READ_ONCE(bat_entries[clu]);
+ read_unlock_irqrestore(&md->lock, flags);
+
return dst_clu;
}
--
2.39.3
More information about the Devel
mailing list