[Devel] [PATCH RH8 v3] dm: Interpreter of ploop1 format (ploop driver)

Konstantin Khorenko khorenko at virtuozzo.com
Fri Oct 18 14:23:57 MSK 2019


Applied, thank you.

--
Best regards,

Konstantin Khorenko,
Virtuozzo Linux Kernel Team

On 10/17/2019 06:24 PM, Kirill Tkhai wrote:
> The idea is to add simple layer on top of block device,
> which dispatches the requests regarding of ploop1 format.
> The most bios are dispatched by ploop_map(), that just
> replaces bio's target cluster with the corresponding
> bat_entries[cluster] value.
>
> In case of a cluster is not presented in BAT, kwork
> is woked up, and it serves all the work of allocation
> of a new cluster and updating BAT. Requests going
> to RO deltas are also served by the kwork (since it's
> impossible to start aio from ploop_map() context).
>
> Service operations are also made from kwork, so often
> we may avoid additional synchronization between them
> and cluster allocation code. See do_ploop_work() for
> details of all of this.
>
> ----
> Full backup, snapshots attaching and merging, tracking, cluster COW
> and device resize are implemented.
>
> TODO: partial backup and preemption of cached bat.
>
> ----
> 1)Start:
> losetup /dev/loop0 delta_N+1.img  (current image)
> dmsetup create dm_ploop --table "0 <size_in_sectors> ploop <cluster_log> /dev/loop0"
> ./add_delta.sh delta_0.img   (oldest snapshot)
> ...
> ./add_delta.sh delta_N.img   (newest snapshot)
> mount /dev/mapper/dm_ploop /mount_point
>
> Where: add_delta.sh is:
>
> #!/bin/bash
> if [ "$#" -ne 1 ];
> then
>         echo "Wrong arguments number"
>         exit 1
> fi
> if [ ! -f "$1" ]; then
>         echo "$1 does not exist"
>         exit 1
> fi
> exec {var}<$1
> dmsetup message dm_ploop 0 add_delta $var
>
> (add_delta requires open fd of new delta. This bash script
>  opens the file and passes fd to dmsetup util).
>
> 2)Start in second way. You may pass deltas fd to dmsetup create (extern bash script is required -- not attached):
>
> losetup /dev/loop0 delta_N+1.img  (current image)
> dmsetup create dm_ploop --table "0 <size_in_sectors> ploop <cluster_log> /dev/loop0 <delta_0_fd> .. <delta_N_fd>"
> mount /dev/mapper/dm_ploop /mount_point
>
> 3)Resize of running device.
>
> dmsetup message dm_ploop 0 resize <new_size_in_sectors>
> dmsetup suspend dm_ploop
> dmsetup reload dm_ploop --table "0 <new_size_in_sectors> ploop <cluster_log> /dev/loop0 <delta_0_fd> .. <delta_N_fd>"
> dmsetup resume dm_ploop
>
> (Currently dm-core can't change size of mapped device w/o reloading --
>  despite there is no strict limitations of this -- it should be implemented).
>
> 4)Merge of latest snapshot forward
>
> $dmsetup message dm_ploop 0 merge
>
> 5)Notify that userspace merged intermediate snapshot
>
> $dmsetup message dm_ploop 0 notify_merged_forward <index>
>
> or
>
> $dmsetup message dm_ploop 0 notify_merged_backward <index>
>
> 6)Snapshot (switch top_delta)
>
> $dmsetup message dm_ploop 0 snapshot <old_top_delta_fd> <loop_attached_to_new_top_delta>
>
> 7)Tracking
>
> $dmsetup message dm_ploop 0 tracking_start
> $dmsetup message dm_ploop 0 tracking_stop
> $dmsetup message dm_ploop 0 tracking_get_next (returns cluster or -EAGAIN if no dirty blocks)
>
> 8)Update RO delta index
>
> $dmsetup message dm_ploop 0 update_delta_index <level> <map>
>
> where <map> is cluster1:dst_cluster1;cluster2:dst_cluster2;
>
> 9)Prohibit resume
>
> $dmsetup message dm_ploop 0 set_noresume 1
>
> (or set_noresume 0 to enable it back)
>
> 10)Flip upper delta
>
> Let we have Base.img and TopDelta.img
>
> $dmsetup message dm_ploop 0 flip_upper_deltas <loop-attached-to-Base.img> <TopDelta-fd>
>
> 11)Push backup
>
> Start:	$dmsetup message dm_ploop 0 push_backup_start <UUID> <addr_of_user_mask>
> 	(currenly only <addr_of_user_mask> == 0 is supported)
>
> Stop:	$dmsetup message dm_ploop 0 push_backup_stop <UUID>
>
> Get UUID: $dmsetup message dm_ploop 0 push_backup_get_uuid
>
> Read:	$dmsetup message dm_ploop 0 push_backup_read <UUID>
>
>         returns extent: start cluster and number of following clusters (divided by ":")
>
> Write:	$dmsetup message dm_ploop 0 push_backup_write <UUID> <cluster>:<nr_clusters>
>
> Signed-off-by: Kirill Tkhai <ktkhai at virtuozzo.com>
> ---
>  drivers/md/Kconfig           |    6
>  drivers/md/Makefile          |    3
>  drivers/md/dm-ploop-bat.c    |  302 +++++++
>  drivers/md/dm-ploop-cmd.c    | 1638 ++++++++++++++++++++++++++++++++++++++++
>  drivers/md/dm-ploop-map.c    | 1719 ++++++++++++++++++++++++++++++++++++++++++
>  drivers/md/dm-ploop-target.c |  318 ++++++++
>  drivers/md/dm-ploop.h        |  388 +++++++++
>  7 files changed, 4374 insertions(+)
>  create mode 100644 drivers/md/dm-ploop-bat.c
>  create mode 100644 drivers/md/dm-ploop-cmd.c
>  create mode 100644 drivers/md/dm-ploop-map.c
>  create mode 100644 drivers/md/dm-ploop-target.c
>  create mode 100644 drivers/md/dm-ploop.h
>
> diff --git a/drivers/md/Kconfig b/drivers/md/Kconfig
> index 3db222509e44..46c55982133e 100644
> --- a/drivers/md/Kconfig
> +++ b/drivers/md/Kconfig
> @@ -529,6 +529,12 @@ config DM_INTEGRITY
>  	  To compile this code as a module, choose M here: the module will
>  	  be called dm-integrity.
>
> +config DM_PLOOP
> +	tristate "Ploop target support"
> +	depends on BLK_DEV_DM
> +	---help---
> +          This is ploop1 format interpreter on device-mapper rails.
> +
>  config DM_ZONED
>  	tristate "Drive-managed zoned block device target support"
>  	depends on BLK_DEV_DM
> diff --git a/drivers/md/Makefile b/drivers/md/Makefile
> index 822f4e8753bc..8b87a11895c2 100644
> --- a/drivers/md/Makefile
> +++ b/drivers/md/Makefile
> @@ -18,6 +18,8 @@ dm-cache-y	+= dm-cache-target.o dm-cache-metadata.o dm-cache-policy.o \
>  		    dm-cache-background-tracker.o
>  dm-cache-smq-y   += dm-cache-policy-smq.o
>  dm-era-y	+= dm-era-target.o
> +ploop-y		+= dm-ploop-target.o dm-ploop-map.o dm-ploop-cmd.o \
> +		    dm-ploop-bat.o
>  dm-verity-y	+= dm-verity-target.o
>  md-mod-y	+= md.o md-bitmap.o
>  raid456-y	+= raid5.o raid5-cache.o raid5-ppl.o
> @@ -64,6 +66,7 @@ obj-$(CONFIG_DM_VERITY)		+= dm-verity.o
>  obj-$(CONFIG_DM_CACHE)		+= dm-cache.o
>  obj-$(CONFIG_DM_CACHE_SMQ)	+= dm-cache-smq.o
>  obj-$(CONFIG_DM_ERA)		+= dm-era.o
> +obj-$(CONFIG_DM_PLOOP)		+= ploop.o
>  obj-$(CONFIG_DM_LOG_WRITES)	+= dm-log-writes.o
>  obj-$(CONFIG_DM_INTEGRITY)	+= dm-integrity.o
>  obj-$(CONFIG_DM_ZONED)		+= dm-zoned.o
> diff --git a/drivers/md/dm-ploop-bat.c b/drivers/md/dm-ploop-bat.c
> new file mode 100644
> index 000000000000..82b889ea1b0e
> --- /dev/null
> +++ b/drivers/md/dm-ploop-bat.c
> @@ -0,0 +1,302 @@
> +#include <linux/init.h>
> +#include <linux/uio.h>
> +#include <linux/mm.h>
> +#include "dm-ploop.h"
> +
> +/*
> + * Read from disk and fill bat_entries[]. Note, that on enter here, cluster #0
> + * is already read from disk (with header) -- just parse bio pages content.
> + */
> +static int ploop_read_bat(struct ploop *ploop, struct bio *bio)
> +{
> +	unsigned int entries_per_page, nr_copy, page, i = 0;
> +	map_index_t *addr, off, cluster = 0;
> +	int ret = 0;
> +
> +	entries_per_page = PAGE_SIZE / sizeof(map_index_t);
> +
> +	do {
> +		for (page = 0; page < nr_pages_in_cluster(ploop); page++) {
> +			if (i == 0)
> +				off = PLOOP_MAP_OFFSET;
> +			else
> +				off = 0;
> +
> +			nr_copy = entries_per_page - off;
> +			if (i + nr_copy > ploop->nr_bat_entries)
> +				nr_copy = ploop->nr_bat_entries - i;
> +
> +			addr = kmap_atomic(bio->bi_io_vec[page].bv_page);
> +			memcpy(&ploop->bat_entries[i], addr + off,
> +				nr_copy * sizeof(map_index_t));
> +			kunmap_atomic(addr);
> +			i += nr_copy;
> +
> +			if (i >= ploop->nr_bat_entries)
> +				goto out;
> +		}
> +
> +		ret = ploop_read_cluster_sync(ploop, bio, ++cluster);
> +		if (ret)
> +			goto err;
> +
> +	} while (1);
> +
> +out:
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		if (ploop->bat_entries[i] == BAT_ENTRY_NONE) {
> +			ret = -EINVAL;
> +			goto err;
> +		}
> +		if (!ploop->bat_entries[i])
> +			ploop->bat_entries[i] = BAT_ENTRY_NONE;
> +	}
> +
> +err:
> +	return ret;
> +}
> +
> +/* Alloc holes_bitmap and set bits of free clusters */
> +static int ploop_assign_hb_and_levels(struct ploop *ploop,
> +				      unsigned int bat_clusters)
> +{
> +	unsigned int i, size, dst_cluster;
> +
> +	/*
> +	 * + number of data clusters.
> +	 * Note, that after shrink of large disk, ploop->bat_entries[x] may
> +	 * refer outward of [0, ploop->hb_nr-1], and we never allocate
> +	 * holes_bitmap for such the clusters. Just remember to skip these
> +	 * clusters after discard frees them.
> +	 */
> +	ploop->hb_nr = bat_clusters + ploop->nr_bat_entries;
> +	size = round_up(DIV_ROUND_UP(ploop->hb_nr, 8), sizeof(unsigned long));
> +
> +	/* holes_bitmap numbers is relative to start of file */
> +	ploop->holes_bitmap = kvmalloc(size, GFP_KERNEL);
> +	if (!ploop->holes_bitmap)
> +		return -ENOMEM;
> +	memset(ploop->holes_bitmap, 0xff, size);
> +
> +	size = ploop->nr_bat_entries * sizeof(ploop->bat_levels[0]);
> +	ploop->bat_levels = kvzalloc(size, GFP_KERNEL);
> +	if (!ploop->bat_levels)
> +		return -ENOMEM;
> +
> +	/* Mark all BAT clusters as occupied. */
> +	for (i = 0; i < bat_clusters; i++)
> +		ploop_hole_clear_bit(i, ploop);
> +
> +	/*
> +	 * Clear all clusters, which are referred to in BAT, from holes_bitmap.
> +	 * Set bat_levels[] to top delta's level.
> +	 */
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		dst_cluster = ploop->bat_entries[i];
> +		if (dst_cluster != BAT_ENTRY_NONE) {
> +			ploop->bat_levels[i] = BAT_LEVEL_TOP;
> +			/* Cluster may refer out holes_bitmap after shrinking */
> +			if (dst_cluster < ploop->hb_nr)
> +				ploop_hole_clear_bit(dst_cluster, ploop);
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +/*
> + * Allocate memory for bat_entries, bat_levels and holes_bitmap,
> + * and read their content from disk.
> + */
> +int ploop_read_metadata(struct dm_target *ti, struct ploop *ploop)
> +{
> +	unsigned int bat_clusters, offset_clusters, cluster_log;
> +	struct ploop_pvd_header *m_hdr = NULL;
> +	unsigned long size;
> +	struct bio *bio;
> +	int ret;
> +	void *data;
> +
> +	/* Hardcode cluster size 1M for now ... FIXME */
> +	cluster_log = ploop->cluster_log;
> +	if (cluster_log != 11)
> +		return -ENOTSUPP;
> +
> +	bio = alloc_bio_with_pages(ploop);
> +	if (!bio)
> +		return -ENOMEM;
> +
> +	ret = ploop_read_cluster_sync(ploop, bio, 0);
> +	if (ret < 0)
> +		goto out;
> +
> +	m_hdr = kmap_atomic(bio->bi_io_vec[0].bv_page);
> +
> +	ret = -ENOTSUPP;
> +	if (strncmp(m_hdr->m_Sig, "WithouFreSpacExt", 16))
> +		goto out;
> +
> +	ret = -ENOLCK;
> +#if 0	/* Wait till Igor is implemented PSBM-98203 */
> +	if (m_hdr->m_DiskInUse != cpu_to_le32(SIGNATURE_DISK_IN_USE) &&
> +	    ploop_is_ro(ploop))
> +		goto out;
> +#endif
> +
> +	ret = -EINVAL;
> +	if (le32_to_cpu(m_hdr->m_Sectors) != 1 << cluster_log)
> +		goto out;
> +
> +	ploop->nr_bat_entries = le32_to_cpu(m_hdr->m_Size);
> +
> +	/* Header and BAT-occupied clusters at start of file */
> +	size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
> +	bat_clusters = DIV_ROUND_UP(size, 1 << (cluster_log + 9));
> +
> +	/* Clusters from start of file to first data block */
> +	offset_clusters = le32_to_cpu(m_hdr->m_FirstBlockOffset) >> cluster_log;
> +	if (bat_clusters != offset_clusters) {
> +		pr_err("ploop: custom FirstBlockOffset\n");
> +		goto out;
> +	}
> +
> +	ret = -ENOMEM;
> +	/*
> +	 * Memory for hdr and array of BAT mapping. We keep them
> +	 * neighbours like they are stored on disk to simplify
> +	 * BAT update code.
> +	 */
> +	data = vmalloc(size);
> +	if (!data)
> +		goto out;
> +	BUG_ON((unsigned long)data & ~PAGE_MASK);
> +
> +	memcpy(data, m_hdr, sizeof(*m_hdr));
> +	ploop->hdr = data;
> +	ploop->bat_entries = data + sizeof(*m_hdr);
> +	kunmap_atomic(m_hdr);
> +	m_hdr = NULL;
> +
> +	ret = ploop_read_bat(ploop, bio);
> +	if (ret)
> +		goto out;
> +
> +	ret = ploop_assign_hb_and_levels(ploop, bat_clusters);
> +out:
> +	if (m_hdr)
> +		kunmap_atomic(m_hdr);
> +	free_bio_with_pages(ploop, bio);
> +	return ret;
> +}
> +
> +static int ploop_delta_check_header(struct ploop *ploop, struct page *page,
> +		       unsigned int *nr_pages, unsigned int *last_page_len)
> +{
> +	unsigned int bytes, delta_nr_be, offset_clusters, bat_clusters, cluster_log;
> +	struct ploop_pvd_header *hdr;
> +	u64 size, delta_size;
> +	int ret = -EPROTO;
> +
> +	hdr = kmap_atomic(page);
> +
> +	if (memcmp(hdr->m_Sig, ploop->hdr->m_Sig, sizeof(hdr->m_Sig)) ||
> +	    hdr->m_Sectors != ploop->hdr->m_Sectors ||
> +	    hdr->m_Type != ploop->hdr->m_Type)
> +		goto out;
> +
> +	delta_size = le64_to_cpu(hdr->m_SizeInSectors_v2);
> +	delta_nr_be = le32_to_cpu(hdr->m_Size);
> +	size = ploop->hdr->m_SizeInSectors_v2;
> +	cluster_log = ploop->cluster_log;
> +	offset_clusters = le32_to_cpu(hdr->m_FirstBlockOffset) >> cluster_log;
> +	bytes = (PLOOP_MAP_OFFSET + delta_nr_be) * sizeof(map_index_t);
> +	bat_clusters = DIV_ROUND_UP(bytes, 1 << (cluster_log + 9));
> +
> +	if (delta_size > size || delta_nr_be > ploop->nr_bat_entries ||
> +	    bat_clusters != offset_clusters)
> +		goto out;
> +
> +	*nr_pages = DIV_ROUND_UP(bytes, PAGE_SIZE);
> +	bytes &= ~PAGE_MASK;
> +	*last_page_len = bytes ? : PAGE_SIZE;
> +	ret = 0;
> +out:
> +	kunmap_atomic(hdr);
> +	return ret;
> +}
> +
> +int ploop_read_delta_metadata(struct ploop *ploop, struct file *file,
> +			      void **d_hdr)
> +{
> +	unsigned int i, last_page_len, size, nr_pages = 1;
> +	unsigned int *delta_bat_entries;
> +	struct iov_iter iter;
> +	struct bio_vec bvec;
> +	struct page *page;
> +	ssize_t len;
> +	void *from;
> +	loff_t pos;
> +	int ret;
> +
> +	page = alloc_page(GFP_KERNEL);
> +	if (!page)
> +		return -ENOMEM;
> +
> +	size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
> +	*d_hdr = vzalloc(size);
> +	if (!*d_hdr) {
> +		ret = -ENOMEM;
> +		goto out_put_page;
> +	}
> +
> +	for (i = 0; i < nr_pages; i++) {
> +		bvec.bv_page = page;
> +		bvec.bv_len = PAGE_SIZE;
> +		bvec.bv_offset = 0;
> +
> +		iov_iter_bvec(&iter, READ|ITER_BVEC, &bvec, 1, bvec.bv_len);
> +		pos = i << PAGE_SHIFT;
> +
> +		len = vfs_iter_read(file, &iter, &pos, 0);
> +		if (len != PAGE_SIZE) {
> +			ret = len < 0 ? (int)len : -ENODATA;
> +			goto out_vfree;
> +		}
> +
> +		if (i == 0) {
> +			/* First page with header. Updates nr_pages. */
> +			ret = ploop_delta_check_header(ploop, page,
> +					&nr_pages, &last_page_len);
> +			if (ret)
> +				goto out_vfree;
> +		}
> +
> +		if (i + 1 == nr_pages) {
> +			/* Last page, possible, incomplete */
> +			len = last_page_len;
> +		}
> +
> +		from = kmap_atomic(page);
> +		memcpy(*d_hdr + (i << PAGE_SHIFT), from, len);
> +		kunmap_atomic(from);
> +	}
> +
> +	delta_bat_entries = *d_hdr + PLOOP_MAP_OFFSET * sizeof(map_index_t);
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		if (delta_bat_entries[i] == BAT_ENTRY_NONE) {
> +			ret = -EPROTO;
> +			goto out_vfree;
> +		}
> +		if (!delta_bat_entries[i])
> +			delta_bat_entries[i] = BAT_ENTRY_NONE;
> +	}
> +
> +out_vfree:
> +	if (ret) {
> +		vfree(*d_hdr);
> +		*d_hdr = NULL;
> +	}
> +out_put_page:
> +	put_page(page);
> +	return ret;
> +}
> diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c
> new file mode 100644
> index 000000000000..bde9ae11a625
> --- /dev/null
> +++ b/drivers/md/dm-ploop-cmd.c
> @@ -0,0 +1,1638 @@
> +#include <linux/init.h>
> +#include <linux/file.h>
> +#include <linux/uio.h>
> +#include <linux/umh.h>
> +#include "dm-ploop.h"
> +
> +#define DM_MSG_PREFIX "ploop"
> +
> +static void ploop_queue_deferred_cmd(struct ploop *ploop, struct ploop_cmd *cmd)
> +{
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	BUG_ON(ploop->deferred_cmd && ploop->deferred_cmd != cmd);
> +	ploop->deferred_cmd = cmd;
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +	queue_work(ploop->wq, &ploop->worker);
> +}
> +
> +/*
> + * Assign newly allocated memory for BAT array and holes_bitmap
> + * before grow.
> + */
> +static void ploop_advance_bat_and_holes(struct ploop *ploop,
> +					struct ploop_cmd *cmd)
> +{
> +	unsigned int i, size, dst_cluster;
> +
> +	/* This is called only once */
> +	if (cmd->resize.stage != PLOOP_GROW_STAGE_INITIAL)
> +		return;
> +	cmd->resize.stage++;
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	/* Copy and swap holes_bitmap */
> +	size = DIV_ROUND_UP(ploop->hb_nr, 8);
> +	memcpy(cmd->resize.holes_bitmap, ploop->holes_bitmap, size);
> +	swap(cmd->resize.holes_bitmap, ploop->holes_bitmap);
> +	for (i = ploop->hb_nr; i < size * 8; i++)
> +		set_bit(i, ploop->holes_bitmap);
> +	swap(cmd->resize.hb_nr, ploop->hb_nr);
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		if (!cluster_is_in_top_delta(ploop, i))
> +			continue;
> +		dst_cluster = ploop->bat_entries[i];
> +		if (dst_cluster < ploop->hb_nr &&
> +		    test_bit(dst_cluster, ploop->holes_bitmap)) {
> +			/* This may happen after grow->shrink->(now) grow */
> +			ploop_hole_clear_bit(dst_cluster, ploop);
> +		}
> +	}
> +
> +	/* Copy and swap bat_entries */
> +	size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
> +	memcpy(cmd->resize.hdr, ploop->hdr, size);
> +	swap(cmd->resize.hdr, ploop->hdr);
> +	ploop->bat_entries = (void *)ploop->hdr + sizeof(*ploop->hdr);
> +
> +	/* Copy and swap bat_levels */
> +	size = ploop->nr_bat_entries * sizeof(ploop->bat_levels[0]);
> +	memcpy(cmd->resize.bat_levels, ploop->bat_levels, size);
> +	swap(cmd->resize.bat_levels, ploop->bat_levels);
> +	write_unlock_irq(&ploop->bat_rwlock);
> +}
> +
> +/*
> + * Switch index of ploop->inflight_bios_ref[] and wait till inflight
> + * bios are completed. This waits for completion of simple submitted
> + * action like write to origin_dev or read from delta, but it never
> + * guarantees completion of complex actions like "data write + index
> + * writeback" (for index protection look at cluster locks). This is
> + * weaker, than "dmsetup suspend".
> + * It is called from kwork only, so this can't be executed in parallel.
> + */
> +void ploop_inflight_bios_ref_switch(struct ploop *ploop)
> +{
> +	unsigned int index = ploop->inflight_bios_ref_index;
> +
> +	WARN_ON_ONCE(!(current->flags & PF_WQ_WORKER));
> +	init_completion(&ploop->inflight_bios_ref_comp);
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	ploop->inflight_bios_ref_index = !index;
> +	write_unlock_irq(&ploop->bat_rwlock);
> +
> +	percpu_ref_kill(&ploop->inflight_bios_ref[index]);
> +
> +	wait_for_completion(&ploop->inflight_bios_ref_comp);
> +	percpu_ref_reinit(&ploop->inflight_bios_ref[index]);
> +}
> +
> +/* Find existing BAT cluster pointing to dst_cluster */
> +static unsigned int ploop_find_bat_entry(struct ploop *ploop,
> +					 unsigned int dst_cluster,
> +					 bool *is_locked)
> +{
> +	unsigned int i, cluster = UINT_MAX;
> +
> +	read_lock_irq(&ploop->bat_rwlock);
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		if (ploop->bat_entries[i] != dst_cluster)
> +			continue;
> +		if (cluster_is_in_top_delta(ploop, i)) {
> +			cluster = i;
> +			break;
> +		}
> +	}
> +	read_unlock_irq(&ploop->bat_rwlock);
> +
> +	*is_locked = false;
> +	if (cluster != UINT_MAX) {
> +		spin_lock_irq(&ploop->deferred_lock);
> +		*is_locked = find_lk_of_cluster(ploop, cluster);
> +		spin_unlock_irq(&ploop->deferred_lock);
> +	}
> +
> +	return cluster;
> +}
> +
> +void bio_prepare_offsets(struct ploop *ploop, struct bio *bio,
> +			 unsigned int cluster)
> +{
> +	unsigned int cluster_log = ploop->cluster_log;
> +	int i, nr_pages = nr_pages_in_cluster(ploop);
> +
> +	bio->bi_vcnt = nr_pages;
> +
> +	for (i = 0; i < nr_pages; i++) {
> +		bio->bi_io_vec[i].bv_offset = 0;
> +		bio->bi_io_vec[i].bv_len = PAGE_SIZE;
> +	}
> +	bio->bi_iter.bi_sector = cluster << cluster_log;
> +	bio->bi_iter.bi_size = 1 << (cluster_log + 9);
> +}
> +
> +int ploop_read_cluster_sync(struct ploop *ploop, struct bio *bio,
> +			    unsigned int cluster)
> +{
> +	bio_reset(bio);
> +	bio_prepare_offsets(ploop, bio, cluster);
> +	remap_to_origin(ploop, bio);
> +	bio_set_op_attrs(bio, REQ_OP_READ, 0);
> +
> +	return submit_bio_wait(bio);
> +}
> +
> +static int ploop_write_cluster_sync(struct ploop *ploop, struct bio *bio,
> +				   unsigned int cluster)
> +{
> +	struct block_device *bdev = ploop->origin_dev->bdev;
> +	int ret;
> +
> +	bio_reset(bio);
> +	bio_prepare_offsets(ploop, bio, cluster);
> +	remap_to_origin(ploop, bio);
> +	bio_set_op_attrs(bio, REQ_OP_WRITE, REQ_FUA | REQ_PREFLUSH);
> +
> +	ret = submit_bio_wait(bio);
> +	track_bio(ploop, bio);
> +	if (ret)
> +		return ret;
> +
> +	if (!blk_queue_fua(bdev_get_queue(bdev))) {
> +		/*
> +		 * Error here does not mean that cluster write is failed,
> +		 * since ploop_map() could submit more bios in parallel.
> +		 * But it's not possible to differ them. Should we block
> +		 * ploop_map() during we do this?
> +		 */
> +		ret = blkdev_issue_flush(bdev, GFP_NOIO, NULL);
> +	}
> +
> +	return ret;
> +}
> +
> +static int ploop_write_zero_cluster_sync(struct ploop *ploop,
> +					 struct bio *bio,
> +					 unsigned int cluster)
> +{
> +	bio_reset(bio);
> +	bio_prepare_offsets(ploop, bio, cluster);
> +	zero_fill_bio(bio);
> +
> +	return ploop_write_cluster_sync(ploop, bio, cluster);
> +}
> +
> +static int ploop_grow_relocate_cluster(struct ploop *ploop,
> +				       struct ploop_index_wb *piwb,
> +				       struct ploop_cmd *cmd)
> +{
> +	struct bio *bio = cmd->resize.bio;
> +	unsigned int new_dst, cluster, dst_cluster;
> +	bool is_locked;
> +	int ret = 0;
> +
> +	dst_cluster = cmd->resize.dst_cluster;
> +
> +	/* Relocate cluster and update index */
> +	cluster = ploop_find_bat_entry(ploop, dst_cluster, &is_locked);
> +	if (cluster == UINT_MAX || is_locked) {
> +		/* dst_cluster in top delta is not occupied? */
> +		if (!test_bit(dst_cluster, ploop->holes_bitmap) || is_locked) {
> +			/*
> +			 * No. Maybe, it's under COW. Try again later.
> +			 * FIXME: implement a wait list-like thing for
> +			 * clusters under COW and queue commands there.
> +			 */
> +			schedule_timeout(HZ/10);
> +			goto out;
> +		}
> +		/* Cluster is free, occupy it. Skip relocaton */
> +		ploop_hole_clear_bit(dst_cluster, ploop);
> +		goto not_occupied;
> +	}
> +
> +	/* Redirect bios to kwork and wait inflights, which may use @cluster */
> +	force_defer_bio_count_inc(ploop);
> +	ploop_inflight_bios_ref_switch(ploop);
> +
> +	/* Read full cluster sync */
> +	ret = ploop_read_cluster_sync(ploop, bio, dst_cluster);
> +	if (ret < 0)
> +		goto out;
> +
> +	ret = ploop_prepare_reloc_index_wb(ploop, piwb, cluster,
> +					   &new_dst);
> +	if (ret < 0)
> +		goto out;
> +
> +	/* Write cluster to new destination */
> +	ret = ploop_write_cluster_sync(ploop, bio, new_dst);
> +	if (ret) {
> +		ploop_reset_bat_update(piwb);
> +		goto out;
> +	}
> +
> +	/* Write new index on disk */
> +	ploop_submit_index_wb_sync(ploop, piwb);
> +	ret = blk_status_to_errno(piwb->bi_status);
> +	ploop_reset_bat_update(piwb);
> +	if (ret)
> +		goto out;
> +
> +	/* Update local BAT copy */
> +	write_lock_irq(&ploop->bat_rwlock);
> +	ploop->bat_entries[cluster] = new_dst;
> +	WARN_ON(!cluster_is_in_top_delta(ploop, cluster));
> +	write_unlock_irq(&ploop->bat_rwlock);
> +not_occupied:
> +	/*
> +	 * Now dst_cluster is not referenced in BAT, so increase the value
> +	 * for next iteration. The place we do this is significant: caller
> +	 * makes rollback based on this.
> +	 */
> +	cmd->resize.dst_cluster++;
> +
> +	/* Zero new BAT entries on disk. */
> +	ret = ploop_write_zero_cluster_sync(ploop, bio, dst_cluster);
> +out:
> +	if (cluster != UINT_MAX)
> +		force_defer_bio_count_dec(ploop);
> +
> +	return ret;
> +}
> +
> +static int ploop_grow_update_header(struct ploop *ploop,
> +				    struct ploop_index_wb *piwb,
> +				    struct ploop_cmd *cmd)
> +{
> +	unsigned int size, first_block_off, cluster_log = ploop->cluster_log;
> +	struct ploop_pvd_header *hdr;
> +	int ret;
> +
> +	/* hdr is in the same page as bat_entries[0] index */
> +	ret = ploop_prepare_reloc_index_wb(ploop, piwb, 0, NULL);
> +	if (ret)
> +		return ret;
> +
> +	size = (PLOOP_MAP_OFFSET + cmd->resize.nr_bat_entries);
> +	size *= sizeof(map_index_t);
> +	size = DIV_ROUND_UP(size, 1 << (cluster_log + 9));
> +	first_block_off = size << cluster_log;
> +
> +	hdr = kmap_atomic(piwb->bat_page);
> +	/* TODO: head and cylinders */
> +	hdr->m_Size = cpu_to_le32(cmd->resize.nr_bat_entries);
> +	hdr->m_SizeInSectors_v2 = cpu_to_le64(cmd->resize.new_size);
> +	hdr->m_FirstBlockOffset = cpu_to_le32(first_block_off);
> +	kunmap_atomic(hdr);
> +
> +	ploop_submit_index_wb_sync(ploop, piwb);
> +	ret = blk_status_to_errno(piwb->bi_status);
> +	if (ret)
> +		goto out;
> +
> +	/* Update header local copy */
> +	hdr = kmap_atomic(piwb->bat_page);
> +	write_lock_irq(&ploop->bat_rwlock);
> +	memcpy(ploop->hdr, hdr, sizeof(*hdr));
> +	write_unlock_irq(&ploop->bat_rwlock);
> +	kunmap_atomic(hdr);
> +out:
> +	ploop_reset_bat_update(piwb);
> +	return ret;
> +}
> +
> +/*
> + * Here we relocate data clusters, which may intersect with BAT area
> + * of disk after resize. For user they look as already written to disk,
> + * so be careful(!) and protective. Update indexes only after cluster
> + * data is written to disk.
> + *
> + * This is called from deferred work -- the only place we alloc clusters.
> + * So, nobody can reallocate clusters updated in ploop_grow_relocate_cluster().
> + */
> +static void process_resize_cmd(struct ploop *ploop, struct ploop_index_wb *piwb,
> +			       struct ploop_cmd *cmd)
> +{
> +	unsigned int dst_cluster;
> +	int ret = 0;
> +
> +	/*
> +	 *  Update memory arrays and hb_nr, but do not update nr_bat_entries.
> +	 *  This is noop except first enter to this function.
> +	 */
> +	ploop_advance_bat_and_holes(ploop, cmd);
> +
> +	if (cmd->resize.dst_cluster <= cmd->resize.end_dst_cluster) {
> +		ret = ploop_grow_relocate_cluster(ploop, piwb, cmd);
> +		if (ret)
> +			goto out;
> +
> +		/* Move one cluster per cmd to allow other requests. */
> +		ploop_queue_deferred_cmd(ploop, cmd);
> +		return;
> +	} else {
> +		/* Update header metadata */
> +		ret = ploop_grow_update_header(ploop, piwb, cmd);
> +	}
> +
> +out:
> +	write_lock_irq(&ploop->bat_rwlock);
> +	if (ret) {
> +		/* Cleanup: mark new BAT overages as free clusters */
> +		dst_cluster = cmd->resize.dst_cluster - 1;
> +
> +		while (dst_cluster >= cmd->resize.nr_old_bat_clu) {
> +			ploop_hole_set_bit(dst_cluster, ploop);
> +			dst_cluster--;
> +		}
> +		swap(ploop->hb_nr, cmd->resize.hb_nr);
> +	} else
> +		swap(ploop->nr_bat_entries, cmd->resize.nr_bat_entries);
> +	write_unlock_irq(&ploop->bat_rwlock);
> +
> +	cmd->retval = ret;
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +}
> +
> +struct bio *alloc_bio_with_pages(struct ploop *ploop)
> +{
> +	unsigned int cluster_log = ploop->cluster_log;
> +	int i, nr_pages = nr_pages_in_cluster(ploop);
> +	struct bio *bio;
> +
> +	bio = bio_alloc(GFP_NOIO, nr_pages);
> +	if (!bio)
> +		return NULL;
> +
> +	for (i = 0; i < nr_pages; i++) {
> +		bio->bi_io_vec[i].bv_page = alloc_page(GFP_NOIO);
> +		if (!bio->bi_io_vec[i].bv_page)
> +			goto err;
> +		bio->bi_io_vec[i].bv_offset = 0;
> +		bio->bi_io_vec[i].bv_len = PAGE_SIZE;
> +	}
> +
> +	bio->bi_vcnt = nr_pages;
> +	bio->bi_iter.bi_size = 1 << (cluster_log + 9);
> +
> +	return bio;
> +err:
> +	while (i-- > 0)
> +		put_page(bio->bi_io_vec[i].bv_page);
> +	bio_put(bio);
> +	return NULL;
> +}
> +
> +void free_bio_with_pages(struct ploop *ploop, struct bio *bio)
> +{
> +	int i, nr_pages = bio->bi_vcnt;
> +	struct page *page;
> +
> +	/*
> +	 * Not a error for this function, but the rest of code
> +	 * may expect this. Sanity check.
> +	 */
> +	WARN_ON_ONCE(nr_pages != nr_pages_in_cluster(ploop));
> +
> +	for (i = 0; i < nr_pages; i++) {
> +		page = bio->bi_io_vec[i].bv_page;
> +		put_page(page);
> +	}
> +
> +	bio_put(bio);
> +}
> +
> +/* @new_size is in sectors */
> +static int ploop_resize(struct ploop *ploop, u64 new_size)
> +{
> +	unsigned int nr_bat_entries, nr_old_bat_clusters, nr_bat_clusters;
> +	unsigned int hb_nr, size, cluster_log = ploop->cluster_log;
> +	struct ploop_pvd_header *hdr = ploop->hdr;
> +	struct ploop_cmd cmd = { {0} };
> +	int ret = -ENOMEM;
> +	u64 old_size;
> +
> +	if (ploop->maintaince)
> +		return -EBUSY;
> +	if (ploop_is_ro(ploop))
> +		return -EROFS;
> +	old_size = le64_to_cpu(hdr->m_SizeInSectors_v2);
> +	if (old_size == new_size)
> +		return 0;
> +	if (old_size > new_size) {
> +		DMWARN("online shrink is not supported");
> +		return -EINVAL;
> +	} else if ((new_size >> cluster_log) >= UINT_MAX - 2) {
> +		DMWARN("resize: too large size is requested");
> +		return -EINVAL;
> +	} else if (new_size & ((1 << cluster_log) - 1)) {
> +		DMWARN("resize: new_size is not aligned");
> +		return -EINVAL;
> +	}
> +
> +	nr_bat_entries = (new_size >> cluster_log);
> +
> +	size = nr_bat_entries * sizeof(ploop->bat_levels[0]);
> +	cmd.resize.bat_levels = kvzalloc(size, GFP_KERNEL);
> +	if (!cmd.resize.bat_levels)
> +		goto err;
> +
> +	size = (PLOOP_MAP_OFFSET + nr_bat_entries) * sizeof(map_index_t);
> +
> +	/* Memory for hdr + bat_entries */
> +	cmd.resize.hdr = vzalloc(size);
> +	if (!cmd.resize.hdr)
> +		goto err;
> +
> +	nr_bat_clusters = DIV_ROUND_UP(size, 1 << (cluster_log + 9));
> +	hb_nr = nr_bat_clusters + nr_bat_entries;
> +	size = round_up(DIV_ROUND_UP(hb_nr, 8), sizeof(unsigned long));
> +
> +	/* Currently occupied bat clusters */
> +	nr_old_bat_clusters = ploop_nr_bat_clusters(ploop,
> +						    ploop->nr_bat_entries);
> +	/* Memory for holes_bitmap */
> +	cmd.resize.holes_bitmap = kvmalloc(size, GFP_KERNEL);
> +	if (!cmd.resize.holes_bitmap)
> +		goto err;
> +
> +	/* Mark all new bitmap memory as holes */
> +	old_size = DIV_ROUND_UP(ploop->hb_nr, 8);
> +	memset(cmd.resize.holes_bitmap + old_size, 0xff, size - old_size);
> +
> +	cmd.resize.bio = alloc_bio_with_pages(ploop);
> +	if (!cmd.resize.bio)
> +		goto err;
> +	cmd.resize.bio->bi_status = 0;
> +
> +	cmd.resize.cluster = UINT_MAX;
> +	cmd.resize.dst_cluster = nr_old_bat_clusters;
> +	cmd.resize.end_dst_cluster = nr_bat_clusters - 1;
> +	cmd.resize.nr_old_bat_clu = nr_old_bat_clusters;
> +	cmd.resize.nr_bat_entries = nr_bat_entries;
> +	cmd.resize.hb_nr = hb_nr;
> +	cmd.resize.new_size = new_size;
> +	cmd.retval = 0;
> +	cmd.type = PLOOP_CMD_RESIZE;
> +	cmd.ploop = ploop;
> +
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	wait_for_completion(&cmd.comp);
> +
> +	ret = cmd.retval;
> +err:
> +	if (cmd.resize.bio)
> +		free_bio_with_pages(ploop, cmd.resize.bio);
> +	kvfree(cmd.resize.bat_levels);
> +	kvfree(cmd.resize.holes_bitmap);
> +	vfree(cmd.resize.hdr);
> +	return ret;
> +}
> +
> +/* FIXME: this must not be called on running device */
> +static void process_add_delta_cmd(struct ploop *ploop, struct ploop_cmd *cmd)
> +{
> +	map_index_t *bat_entries, *delta_bat_entries;
> +	unsigned int i, level, dst_cluster;
> +	u8 *bat_levels;
> +
> +	if (unlikely(ploop->force_link_inflight_bios)) {
> +		cmd->retval = -EBUSY;
> +		pr_err("ploop: adding delta on running device\n");
> +		goto out;
> +	}
> +
> +	level = ploop->nr_deltas;
> +	bat_entries = ploop->bat_entries;
> +	bat_levels = ploop->bat_levels;
> +	delta_bat_entries = (map_index_t *)cmd->add_delta.hdr + PLOOP_MAP_OFFSET;
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +
> +	/* FIXME: Stop on old delta's nr_bat_entries */
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		if (cluster_is_in_top_delta(ploop, i))
> +			continue;
> +		if (!cmd->add_delta.is_raw)
> +			dst_cluster = delta_bat_entries[i];
> +		else
> +			dst_cluster = i < cmd->add_delta.raw_clusters ? i : BAT_ENTRY_NONE;
> +		if (dst_cluster == BAT_ENTRY_NONE)
> +			continue;
> +		/*
> +		 * Prefer last added delta, since the order is:
> +		 * 1)add top device
> +		 * 2)add oldest delta
> +		 * ...
> +		 * n)add newest delta
> +		 * Keep in mind, top device is current image, and
> +		 * it is added first in contrary the "age" order.
> +		 */
> +		bat_levels[i] = level;
> +		bat_entries[i] = dst_cluster;
> +	}
> +
> +	swap(ploop->deltas, cmd->add_delta.deltas);
> +	ploop->nr_deltas++;
> +	write_unlock_irq(&ploop->bat_rwlock);
> +	get_file(ploop->deltas[level]);
> +	cmd->retval = 0;
> +out:
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +}
> +
> +static int ploop_check_raw_delta(struct ploop *ploop, struct file *file,
> +				 struct ploop_cmd *cmd)
> +{
> +	loff_t loff = i_size_read(file->f_mapping->host);
> +	unsigned int cluster_log = ploop->cluster_log;
> +
> +	if (loff & ((1 << (cluster_log + SECTOR_SHIFT)) - 1))
> +		return -EPROTO;
> +	cmd->add_delta.raw_clusters = loff >> (cluster_log + SECTOR_SHIFT);
> +	return 0;
> +}
> +
> +/*
> + * @fd refers to a new delta, which is placed right before top_delta.
> + * So, userspace has to populate deltas stack from oldest to newest.
> + */
> +int ploop_add_delta(struct ploop *ploop, const char *arg)
> +{
> +	unsigned int level = ploop->nr_deltas;
> +	struct ploop_cmd cmd = { {0} };
> +	struct file **deltas;
> +	bool is_raw = false;
> +	unsigned int size;
> +	struct file *file;
> +	int fd, ret;
> +
> +	if (ploop->maintaince)
> +		return -EBUSY;
> +	if (strncmp(arg, "raw@", 4) == 0) {
> +		is_raw = true;
> +		arg += 4;
> +	}
> +	if (level == BAT_LEVEL_TOP || (is_raw && level))
> +		return -EMFILE;
> +	if (kstrtos32(arg, 10, &fd) < 0)
> +		return -EINVAL;
> +
> +	file = fget(fd);
> +	if (!file)
> +		return -ENOENT;
> +	ret = -EBADF;
> +	if (!(file->f_mode & FMODE_READ))
> +		goto out;
> +
> +	ret = -ENOMEM;
> +	deltas = kcalloc(level + 1, sizeof(*file), GFP_KERNEL);
> +	if (!deltas)
> +		goto out;
> +	size = level * sizeof(*file);
> +	memcpy(deltas, ploop->deltas, size);
> +	deltas[level] = file;
> +	/*
> +	 * BAT update in general is driven by the kwork
> +	 * (see comment in process_one_deferred_bio()),
> +	 * so we delegate the cmd to it.
> +	 */
> +	cmd.add_delta.deltas = deltas;
> +	cmd.add_delta.is_raw = is_raw;
> +	cmd.type = PLOOP_CMD_ADD_DELTA;
> +	cmd.ploop = ploop;
> +
> +	if (is_raw)
> +		ret = ploop_check_raw_delta(ploop, file, &cmd);
> +	else
> +		ret = ploop_read_delta_metadata(ploop, file,
> +						&cmd.add_delta.hdr);
> +	if (ret)
> +		goto out;
> +
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	wait_for_completion(&cmd.comp);
> +	ret = cmd.retval;
> +out:
> +	vfree(cmd.add_delta.hdr);
> +	kfree(cmd.add_delta.deltas);
> +	fput(file);
> +	return ret;
> +}
> +static void ploop_queue_deferred_cmd_wrapper(struct ploop *ploop,
> +					     int ret, void *data)
> +{
> +	struct ploop_cmd *cmd = data;
> +
> +	if (ret) {
> +		/* kwork will see this at next time it is on cpu */
> +		WRITE_ONCE(cmd->retval, ret);
> +	}
> +	atomic_inc(&cmd->merge.nr_available);
> +	ploop_queue_deferred_cmd(cmd->ploop, cmd);
> +}
> +
> +/* Find mergeable cluster and return it in cmd->merge.cluster */
> +static bool iter_delta_clusters(struct ploop *ploop, struct ploop_cmd *cmd)
> +{
> +	unsigned int *cluster = &cmd->merge.cluster;
> +	unsigned int level;
> +	bool skip;
> +
> +	BUG_ON(cmd->type != PLOOP_CMD_MERGE_SNAPSHOT);
> +
> +	for (; *cluster < ploop->nr_bat_entries; ++*cluster) {
> +		/*
> +		 * Check *cluster is provided by the merged delta.
> +		 * We are in kwork, so bat_rwlock is not needed
> +		 * (see comment in process_one_deferred_bio()).
> +		 */
> +		level = ploop->bat_levels[*cluster];
> +		if (ploop->bat_entries[*cluster] == BAT_ENTRY_NONE ||
> +		    level != ploop->nr_deltas - 1)
> +			continue;
> +
> +		spin_lock_irq(&ploop->deferred_lock);
> +		skip = find_lk_of_cluster(ploop, *cluster);
> +		spin_unlock_irq(&ploop->deferred_lock);
> +		if (skip) {
> +			/*
> +			 * Cluster is locked (maybe, under COW).
> +			 * Skip it and try to repeat later.
> +			 */
> +			cmd->merge.do_repeat = true;
> +			continue;
> +		}
> +
> +		return true;
> +	}
> +
> +	return false;
> +}
> +
> +static void process_merge_latest_snapshot_cmd(struct ploop *ploop,
> +					      struct ploop_cmd *cmd)
> +{
> +	unsigned int *cluster = &cmd->merge.cluster;
> +	unsigned int level, dst_cluster;
> +	struct file *file;
> +
> +	if (cmd->retval)
> +		goto out;
> +
> +	while (iter_delta_clusters(ploop, cmd)) {
> +		/*
> +		 * We are in kwork, so bat_rwlock is not needed
> +		 * (we can't race with changing BAT, since cmds
> +		 *  are processed before bios and piwb is sync).
> +		 */
> +		dst_cluster = ploop->bat_entries[*cluster];
> +		level = ploop->bat_levels[*cluster];
> +
> +		/* Check we can submit one more cow in parallel */
> +		if (!atomic_add_unless(&cmd->merge.nr_available, -1, 0))
> +			return;
> +
> +		if (submit_cluster_cow(ploop, level, *cluster, dst_cluster,
> +				    ploop_queue_deferred_cmd_wrapper, cmd)) {
> +			atomic_inc(&cmd->merge.nr_available);
> +			cmd->retval = -ENOMEM;
> +			goto out;
> +		}
> +
> +		++*cluster;
> +	}
> +out:
> +	if (atomic_read(&cmd->merge.nr_available) != NR_MERGE_BIOS) {
> +		/* Wait till last COW queues us */
> +		return;
> +	}
> +
> +	if (cmd->retval == 0 && !cmd->merge.do_repeat) {
> +		/* Delta merged. Release delta's file */
> +		write_lock_irq(&ploop->bat_rwlock);
> +		file = ploop->deltas[--ploop->nr_deltas];
> +		write_unlock_irq(&ploop->bat_rwlock);
> +		ploop_inflight_bios_ref_switch(ploop);
> +		fput(file);
> +	}
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +}
> +
> +static int ploop_merge_latest_snapshot(struct ploop *ploop)
> +{
> +	struct ploop_cmd cmd;
> +	int ret;
> +
> +	if (ploop->maintaince)
> +		return -EBUSY;
> +	if (ploop_is_ro(ploop))
> +		return -EROFS;
> +	if (!ploop->nr_deltas)
> +		return -ENOENT;
> +again:
> +	memset(&cmd, 0, sizeof(cmd));
> +	cmd.type = PLOOP_CMD_MERGE_SNAPSHOT;
> +	cmd.ploop = ploop;
> +	atomic_set(&cmd.merge.nr_available, NR_MERGE_BIOS);
> +
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	ret = wait_for_completion_interruptible(&cmd.comp);
> +	if (ret) {
> +		/*
> +		 * process_merge_latest_snapshot_cmd() will see this
> +		 * later or earlier. Take a lock if you want earlier.
> +		 */
> +		WRITE_ONCE(cmd.retval, -EINTR);
> +		wait_for_completion(&cmd.comp);
> +	}
> +
> +	if (cmd.retval == 0 && cmd.merge.do_repeat)
> +		goto again;
> +
> +	return cmd.retval;
> +}
> +
> +static void process_notify_delta_merged(struct ploop *ploop,
> +					struct ploop_cmd *cmd)
> +{
> +	unsigned int i, *bat_entries, *delta_bat_entries;
> +	void *hdr = cmd->notify_delta_merged.hdr;
> +	u8 level = cmd->notify_delta_merged.level;
> +	struct file *file;
> +	u8 *bat_levels;
> +
> +	bat_entries = ploop->bat_entries;
> +	bat_levels = ploop->bat_levels;
> +	delta_bat_entries = (map_index_t *)hdr + PLOOP_MAP_OFFSET;
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		if (cluster_is_in_top_delta(ploop, i) ||
> +		    delta_bat_entries[i] == BAT_ENTRY_NONE ||
> +		    bat_levels[i] < level) {
> +			continue;
> +		}
> +
> +		/* deltas above @level become renumbered */
> +		if (bat_levels[i] > level) {
> +			bat_levels[i]--;
> +			continue;
> +		}
> +
> +		/*
> +		 * clusters from deltas of @level become pointing to next delta
> +		 * (which became renumbered) or prev delta (if !@forward).
> +		 */
> +		bat_entries[i] = delta_bat_entries[i];
> +		WARN_ON(bat_entries[i] == BAT_ENTRY_NONE);
> +		if (!cmd->notify_delta_merged.forward)
> +			bat_levels[i]--;
> +	}
> +
> +	file = ploop->deltas[level];
> +	/* Renumber deltas above @level */
> +	for (i = level + 1; i < ploop->nr_deltas; i++)
> +		ploop->deltas[i - 1] = ploop->deltas[i];
> +	ploop->deltas[--ploop->nr_deltas] = NULL;
> +	write_unlock_irq(&ploop->bat_rwlock);
> +
> +	ploop_inflight_bios_ref_switch(ploop);
> +	fput(file);
> +
> +	cmd->retval = 0;
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +}
> +
> +static void process_update_delta_index(struct ploop *ploop,
> +				       struct ploop_cmd *cmd)
> +{
> +	const char *map = cmd->update_delta_index.map;
> +	u8 level = cmd->update_delta_index.level;
> +	unsigned int cluster, dst_cluster, n;
> +	int ret = -EINVAL;
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	/* Check all */
> +	while (sscanf(map, "%u:%u;%n", &cluster, &dst_cluster, &n) == 2) {
> +		if (ploop->bat_entries[cluster] == BAT_ENTRY_NONE)
> +			break;
> +		if (cluster >= ploop->nr_bat_entries)
> +			break;
> +		map += n;
> +	}
> +	if (map[0] != '\0')
> +		goto unlock;
> +	/* Commit all */
> +	map = cmd->update_delta_index.map;
> +	while (sscanf(map, "%u:%u;%n", &cluster, &dst_cluster, &n) == 2) {
> +		if (ploop->bat_levels[cluster] == level)
> +			ploop->bat_entries[cluster] = dst_cluster;
> +		map += n;
> +	}
> +	ret = 0;
> +unlock:
> +	write_unlock_irq(&ploop->bat_rwlock);
> +	if (!ret)
> +		ploop_inflight_bios_ref_switch(ploop);
> +
> +	cmd->retval = ret;
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +}
> +
> +static int ploop_delta_clusters_merged(struct ploop *ploop, u8 level,
> +				       bool forward)
> +{
> +	struct ploop_cmd cmd = { {0} };
> +	void *d_hdr = NULL;
> +	struct file *file;
> +	int ret;
> +
> +	/* Reread BAT of deltas[@level + 1] (or [@level - 1]) */
> +	file = ploop->deltas[level + forward ? 1 : -1];
> +
> +	ret = ploop_read_delta_metadata(ploop, file, &d_hdr);
> +	if (ret)
> +		goto out;
> +
> +	cmd.notify_delta_merged.level = level;
> +	cmd.notify_delta_merged.hdr = d_hdr;
> +	cmd.notify_delta_merged.forward = forward;
> +	cmd.type = PLOOP_CMD_NOTIFY_DELTA_MERGED;
> +	cmd.ploop = ploop;
> +
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	wait_for_completion(&cmd.comp);
> +	ret = cmd.retval;
> +out:
> +	vfree(d_hdr);
> +	return ret;
> +}
> +
> +static int ploop_notify_merged(struct ploop *ploop, u8 level, bool forward)
> +{
> +	if (ploop->maintaince)
> +		return -EBUSY;
> +	if (level >= ploop->nr_deltas)
> +		return -ENOENT;
> +	if (level == 0 && !forward)
> +		return -EINVAL;
> +	if (level == ploop->nr_deltas - 1 && forward)
> +		return -EINVAL;
> +	/*
> +	 * Userspace notifies us, it has copied clusters of
> +	 * ploop->deltas[@level] to ploop->deltas[@level + 1]
> +	 * (deltas[@level] to deltas[@level - 1] if !@forward).
> +	 * Now we want to update our bat_entries/levels arrays,
> +	 * where ploop->deltas[@level] is used currently, to use
> +	 * @level + 1 instead. Also we want to put @level's file,
> +	 * and renumerate deltas.
> +	 */
> +	return ploop_delta_clusters_merged(ploop, level, forward);
> +}
> +
> +static int ploop_get_delta_name_cmd(struct ploop *ploop, u8 level,
> +				char *result, unsigned int maxlen)
> +{
> +	struct file *file;
> +	int len, ret;
> +	char *p;
> +
> +	if (level >= ploop->nr_deltas)
> +		return -ENOENT;
> +
> +	/*
> +	 * Nobody can change deltas in parallel, since
> +	 * another cmds are prohibited, but do this
> +	 * for uniformity.
> +	 */
> +	read_lock_irq(&ploop->bat_rwlock);
> +	file = get_file(ploop->deltas[level]);
> +	read_unlock_irq(&ploop->bat_rwlock);
> +
> +	p = file_path(file, result, maxlen);
> +	ret = 1;
> +	if (p == ERR_PTR(-ENAMETOOLONG)) {
> +		/* Notify target_message(), there is not enough space */
> +		memset(result, 'x', maxlen - 1);
> +		result[maxlen - 1] = 0;
> +	} else if (IS_ERR_OR_NULL(p)) {
> +		ret = PTR_ERR(p);
> +	} else {
> +		len = strlen(p);
> +		memmove(result, p, len);
> +		result[len] = '\n';
> +		result[len + 1] = '\0';
> +	}
> +
> +	fput(file);
> +	return ret;
> +}
> +
> +static int ploop_update_delta_index(struct ploop *ploop, unsigned int level,
> +				    const char *map)
> +{
> +	struct ploop_cmd cmd = { {0} };
> +
> +	if (ploop->maintaince)
> +		return -EBUSY;
> +	if (level >= ploop->nr_deltas)
> +		return -ENOENT;
> +
> +	cmd.update_delta_index.level = level;
> +	cmd.update_delta_index.map = map;
> +	cmd.type = PLOOP_CMD_UPDATE_DELTA_INDEX;
> +	cmd.ploop = ploop;
> +
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	wait_for_completion(&cmd.comp);
> +	return cmd.retval;
> +}
> +
> +static void process_switch_top_delta(struct ploop *ploop, struct ploop_cmd *cmd)
> +{
> +	unsigned int i, size, bat_clusters, level = ploop->nr_deltas;
> +
> +	force_defer_bio_count_inc(ploop);
> +	ploop_inflight_bios_ref_switch(ploop);
> +
> +	/* If you add more two-stages-actions, you must cancel them here too */
> +	cancel_discard_bios(ploop);
> +	restart_delta_cow(ploop);
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	swap(ploop->origin_dev, cmd->switch_top_delta.origin_dev);
> +	swap(ploop->deltas, cmd->switch_top_delta.deltas);
> +	for (i = 0; i < ploop->nr_bat_entries; i++)
> +		if (ploop->bat_levels[i] == BAT_LEVEL_TOP)
> +			ploop->bat_levels[i] = level;
> +
> +	/* Header and BAT-occupied clusters at start of file */
> +	size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
> +	bat_clusters = DIV_ROUND_UP(size, 1 << (ploop->cluster_log + 9));
> +	for (i = 0; i < ploop->hb_nr; i++) {
> +		if (i < bat_clusters)
> +			clear_bit(i, ploop->holes_bitmap);
> +		else
> +			set_bit(i, ploop->holes_bitmap);
> +	}
> +
> +	ploop->nr_deltas++;
> +	write_unlock_irq(&ploop->bat_rwlock);
> +	force_defer_bio_count_dec(ploop);
> +
> +	cmd->retval = 0;
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +}
> +
> +/* Switch top delta to new device after userspace has created snapshot */
> +static int ploop_switch_top_delta(struct ploop *ploop, int new_ro_fd,
> +				  char *new_dev)
> +{
> +	struct dm_target *ti = ploop->ti;
> +	struct ploop_cmd cmd = { {0} };
> +	struct file *file;
> +	unsigned int size;
> +	int ret;
> +
> +	cmd.type = PLOOP_CMD_SWITCH_TOP_DELTA;
> +	cmd.ploop = ploop;
> +
> +	if (ploop->maintaince)
> +		return -EBUSY;
> +	if (ploop->nr_deltas == BAT_LEVEL_TOP)
> +		return -EMFILE;
> +	if (!(file = fget(new_ro_fd)))
> +		return -EBADF;
> +	ret = dm_get_device(ti, new_dev, dm_table_get_mode(ti->table),
> +			    &cmd.switch_top_delta.origin_dev);
> +	if (ret)
> +		goto fput;
> +	ret = -ENOMEM;
> +	size = (ploop->nr_deltas + 1) * sizeof(struct file *);
> +	cmd.switch_top_delta.deltas = kmalloc(size, GFP_NOIO);
> +	if (!cmd.switch_top_delta.deltas)
> +		goto put_dev;
> +	size -= sizeof(struct file *);
> +	memcpy(cmd.switch_top_delta.deltas, ploop->deltas, size);
> +	cmd.switch_top_delta.deltas[ploop->nr_deltas] = file;
> +
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	wait_for_completion(&cmd.comp);
> +	ret = cmd.retval;
> +	kfree(cmd.switch_top_delta.deltas);
> +put_dev:
> +	dm_put_device(ploop->ti, cmd.switch_top_delta.origin_dev);
> +fput:
> +	if (ret)
> +		fput(file);
> +	return ret;
> +}
> +
> +static void process_flip_upper_deltas(struct ploop *ploop, struct ploop_cmd *cmd)
> +{
> +	unsigned int i, size, bat_clusters, hb_nr = ploop->hb_nr;
> +	void *holes_bitmap = ploop->holes_bitmap;
> +	u8 level = ploop->nr_deltas - 1;
> +
> +	size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
> +        bat_clusters = DIV_ROUND_UP(size, 1 << (ploop->cluster_log + 9));
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	/* Prepare holes_bitmap */
> +	memset(holes_bitmap, 0xff, hb_nr/8);
> +	for (i = (hb_nr & ~0x7); i < hb_nr; i++)
> +		set_bit(i, holes_bitmap);
> +	for (i = 0; i < bat_clusters; i++)
> +		clear_bit(i, holes_bitmap);
> +
> +	/* Flip bat entries */
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		if (ploop->bat_entries[i] == BAT_ENTRY_NONE)
> +			continue;
> +		if (ploop->bat_levels[i] == level) {
> +			ploop->bat_levels[i] = BAT_LEVEL_TOP;
> +			clear_bit(ploop->bat_entries[i], holes_bitmap);
> +		} else if (ploop->bat_levels[i] == BAT_LEVEL_TOP) {
> +			ploop->bat_levels[i] = level;
> +		}
> +	}
> +	swap(ploop->origin_dev, cmd->flip_upper_deltas.origin_dev);
> +	swap(ploop->deltas[level], cmd->flip_upper_deltas.file);
> +	write_unlock_irq(&ploop->bat_rwlock);
> +	/* Device is suspended, but anyway... */
> +	ploop_inflight_bios_ref_switch(ploop);
> +
> +	cmd->retval = 0;
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +}
> +
> +static void process_tracking_start(struct ploop *ploop, struct ploop_cmd *cmd)
> +{
> +	unsigned int i, dst_cluster, tb_nr = cmd->tracking_start.tb_nr;
> +	void *tracking_bitmap = cmd->tracking_start.tracking_bitmap;
> +	int ret = 0;
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	ploop->tracking_bitmap = tracking_bitmap;
> +	ploop->tb_nr = tb_nr;
> +	write_unlock_irq(&ploop->bat_rwlock);
> +
> +	/*
> +	 * Here we care about ploop_map() sees ploop->tracking_bitmap,
> +	 * since the rest of submitting are made from *this* kwork.
> +	 */
> +	ploop_inflight_bios_ref_switch(ploop);
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	for_each_clear_bit(i, ploop->holes_bitmap, ploop->hb_nr)
> +		set_bit(i, tracking_bitmap);
> +	for (i = 0; i < ploop->nr_bat_entries; i++) {
> +		if (!cluster_is_in_top_delta(ploop, i))
> +			continue;
> +		dst_cluster = ploop->bat_entries[i];
> +		if (WARN_ON(dst_cluster >= tb_nr)) {
> +			ret = -EIO;
> +			goto unlock;
> +		}
> +		set_bit(dst_cluster, tracking_bitmap);
> +	}
> +unlock:
> +	write_unlock_irq(&ploop->bat_rwlock);
> +
> +	cmd->retval = ret;
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +}
> +
> +static int tracking_get_next(struct ploop *ploop, char *result,
> +			     unsigned int maxlen)
> +{
> +	unsigned int i, sz = 0, tb_nr = ploop->tb_nr, prev = ploop->tb_cursor;
> +	void *tracking_bitmap = ploop->tracking_bitmap;
> +	int ret = -EAGAIN;
> +
> +	if (WARN_ON_ONCE(prev > tb_nr - 1))
> +		prev = 0;
> +
> +	write_lock_irq(&ploop->bat_rwlock);
> +	i = find_next_bit(tracking_bitmap, tb_nr, prev + 1);
> +	if (i < tb_nr)
> +		goto found;
> +	i = find_first_bit(tracking_bitmap, prev + 1);
> +	if (i >= prev + 1)
> +		goto unlock;
> +found:
> +	ret = (DMEMIT("%u\n", i)) ? 1 : 0;
> +	if (ret)
> +		clear_bit(i, tracking_bitmap);
> +unlock:
> +	write_unlock_irq(&ploop->bat_rwlock);
> +	if (ret > 0)
> +		ploop->tb_cursor = i;
> +	return ret;
> +}
> +
> +static int ploop_tracking_cmd(struct ploop *ploop, const char *suffix,
> +			      char *result, unsigned int maxlen)
> +{
> +	struct ploop_cmd cmd = { {0} };
> +	void *tracking_bitmap = NULL;
> +	unsigned int i, tb_nr, size;
> +
> +	if (ploop_is_ro(ploop))
> +		return -EROFS;
> +
> +	if (!strcmp(suffix, "get_next")) {
> +		if (!ploop->tracking_bitmap)
> +			return -ENOENT;
> +		return tracking_get_next(ploop, result, maxlen);
> +	}
> +
> +	if (!strcmp(suffix, "start")) {
> +		if (ploop->tracking_bitmap)
> +			return -EEXIST;
> +		if (ploop->maintaince)
> +			return -EBUSY;
> +		tb_nr = ploop->hb_nr;
> +		read_lock_irq(&ploop->bat_rwlock);
> +		for (i = 0; i < ploop->nr_bat_entries; i++)
> +			if (cluster_is_in_top_delta(ploop, i) &&
> +			    ploop->bat_entries[i] >= tb_nr)
> +				tb_nr = ploop->bat_entries[i] + 1;
> +		read_unlock_irq(&ploop->bat_rwlock);
> +		/*
> +		 * After unlock new entries above tb_nr can't
> +		 * occur, since we always alloc clusters from
> +		 * holes_bitmap (and they nr < hb_nr).
> +		 */
> +		size = DIV_ROUND_UP(tb_nr, 8 * sizeof(unsigned long));
> +		size *= sizeof(unsigned long);
> +		tracking_bitmap = kvzalloc(size, GFP_KERNEL);
> +		if (!tracking_bitmap)
> +			return -ENOMEM;
> +		ploop->tb_cursor = tb_nr - 1;
> +
> +		cmd.type = PLOOP_CMD_TRACKING_START;
> +		cmd.ploop = ploop;
> +		cmd.tracking_start.tracking_bitmap = tracking_bitmap;
> +		cmd.tracking_start.tb_nr = tb_nr;
> +
> +		init_completion(&cmd.comp);
> +		ploop_queue_deferred_cmd(ploop, &cmd);
> +		wait_for_completion(&cmd.comp);
> +		ploop->maintaince = true;
> +	} else if (!strcmp(suffix, "stop")) {
> +		if (!ploop->tracking_bitmap)
> +			return -ENOENT;
> +		write_lock_irq(&ploop->bat_rwlock);
> +		kvfree(ploop->tracking_bitmap);
> +		ploop->tracking_bitmap = NULL;
> +		write_unlock_irq(&ploop->bat_rwlock);
> +		ploop->maintaince = false;
> +	} else {
> +		return -EINVAL;
> +	}
> +
> +	return 0;
> +}
> +
> +static int ploop_set_noresume(struct ploop *ploop, char *mode)
> +{
> +	bool noresume;
> +
> +	if (!strcmp(mode, "1"))
> +		noresume = true;
> +	else if (!strcmp(mode, "0"))
> +		noresume = false;
> +	else
> +		return -EINVAL;
> +
> +	if (noresume == ploop->noresume)
> +		return -EBUSY;
> +
> +	ploop->noresume = noresume;
> +	return 0;
> +}
> +
> +static int ploop_flip_upper_deltas(struct ploop *ploop, char *new_dev,
> +				   char *new_ro_fd)
> +{
> +	struct dm_target *ti = ploop->ti;
> +	struct ploop_cmd cmd = { {0} };
> +	int new_fd, ret;
> +
> +	cmd.type = PLOOP_CMD_FLIP_UPPER_DELTAS;
> +	cmd.ploop = ploop;
> +
> +	/* FIXME: prohibit flip on raw delta */
> +	if (!dm_suspended(ti) || !ploop->noresume || ploop->maintaince)
> +		return -EBUSY;
> +	if (ploop_is_ro(ploop))
> +		return -EROFS;
> +	if (!ploop->nr_deltas)
> +		return -ENOENT;
> +	if (kstrtou32(new_ro_fd, 10, &new_fd) < 0 ||
> +	    !(cmd.flip_upper_deltas.file = fget(new_fd)))
> +		return -EBADF;
> +	ret = dm_get_device(ti, new_dev, dm_table_get_mode(ti->table),
> +			    &cmd.flip_upper_deltas.origin_dev);
> +	if (ret)
> +		goto fput;
> +
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	wait_for_completion(&cmd.comp);
> +	ret = cmd.retval;
> +	dm_put_device(ploop->ti, cmd.flip_upper_deltas.origin_dev);
> +fput:
> +	fput(cmd.flip_upper_deltas.file);
> +	return ret;
> +}
> +
> +static void process_set_push_backup(struct ploop *ploop, struct ploop_cmd *cmd)
> +{
> +	struct push_backup *pb = cmd->set_push_backup.pb;
> +
> +	if (!pb)
> +		cleanup_backup(ploop);
> +
> +	spin_lock_irq(&ploop->pb_lock);
> +	/* Take bat_rwlock to make pb visible in ploop_map() */
> +	write_lock(&ploop->bat_rwlock);
> +	swap(ploop->pb, pb);
> +	write_unlock(&ploop->bat_rwlock);
> +	spin_unlock_irq(&ploop->pb_lock);
> +	cmd->retval = 0;
> +	complete(&cmd->comp); /* Last touch of cmd memory */
> +
> +	if (pb)
> +		ploop_free_pb(pb);
> +}
> +
> +static struct push_backup *ploop_alloc_pb(struct ploop *ploop, char *uuid)
> +{
> +	struct push_backup *pb;
> +	unsigned int size;
> +	void *map;
> +
> +	pb = kzalloc(sizeof(*pb), GFP_KERNEL);
> +	if (!pb)
> +		return NULL;
> +	snprintf(pb->uuid, sizeof(pb->uuid), "%s", uuid);
> +	INIT_LIST_HEAD(&pb->pending);
> +	pb->rb_root = RB_ROOT;
> +
> +	size = DIV_ROUND_UP(ploop->nr_bat_entries, 8);
> +	size = round_up(size, sizeof(unsigned long));
> +	map = kvzalloc(size, GFP_KERNEL);
> +	if (!map)
> +		goto out_pb;
> +
> +	pb->ppb_map = map;
> +	return pb;
> +out_pb:
> +	kfree(pb);
> +	return NULL;
> +}
> +
> +void ploop_free_pb(struct push_backup *pb)
> +{
> +	WARN_ON(!RB_EMPTY_ROOT(&pb->rb_root));
> +	kvfree(pb->ppb_map);
> +	kfree(pb);
> +}
> +
> +static void ploop_pb_timer(struct timer_list *timer)
> +{
> +	struct push_backup *pb = from_timer(pb, timer, deadline_timer);
> +	u64 deadline, now = get_jiffies_64();
> +	struct ploop *ploop = pb->ploop;
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&ploop->pb_lock, flags);
> +	deadline = pb->deadline_jiffies;
> +	spin_unlock_irqrestore(&ploop->pb_lock, flags);
> +
> +	if (unlikely(time_before64(now, deadline)))
> +		mod_timer(timer, deadline - now + 1);
> +	else
> +		queue_work(ploop->wq, &ploop->worker);
> +}
> +
> +static void ploop_setup_pb(struct ploop *ploop, struct push_backup *pb)
> +{
> +	unsigned int i, nr_bat_entries = ploop->nr_bat_entries;
> +
> +	/* Full backup */
> +	memset(pb->ppb_map, 0xff, nr_bat_entries / 8);
> +	for (i = round_down(nr_bat_entries, 8); i < nr_bat_entries; i++)
> +		set_bit(i, pb->ppb_map);
> +
> +	pb->deadline_jiffies = S64_MAX;
> +	timer_setup(&pb->deadline_timer, ploop_pb_timer, 0);
> +
> +	pb->ploop = ploop;
> +	pb->alive = true;
> +}
> +
> +static int ploop_push_backup_start(struct ploop *ploop, char *uuid,
> +				   void __user *mask)
> +{
> +	struct ploop_cmd cmd = { {0} };
> +	struct push_backup *pb;
> +
> +	cmd.type = PLOOP_CMD_SET_PUSH_BACKUP;
> +	cmd.ploop = ploop;
> +
> +	if (mask)
> +		return -ENOPROTOOPT; /* TODO */
> +
> +	if (ploop->pb)
> +		return -EEXIST;
> +	/*
> +	 * There is no a problem in case of not suspended for the device.
> +	 * But this means userspace collects wrong backup. Warn it here.
> +	 * Since the device is suspended, we do not care about inflight bios.
> +	 */
> +	if (!dm_suspended(ploop->ti) || ploop->maintaince)
> +		return -EBUSY;
> +	if (strlen(uuid) > sizeof(pb->uuid) - 1)
> +		return -EINVAL;
> +	pb = ploop_alloc_pb(ploop, uuid);
> +	if (!pb)
> +		return -ENOMEM;
> +	ploop_setup_pb(ploop, pb);
> +
> +	/* Assign pb in work, to make it visible w/o locks (in work) */
> +	cmd.set_push_backup.pb = pb;
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	wait_for_completion(&cmd.comp);
> +	ploop->maintaince = true;
> +	return 0;
> +}
> +
> +static int ploop_push_backup_stop(struct ploop *ploop, char *uuid)
> +{
> +	struct ploop_cmd cmd = { {0} };
> +
> +	cmd.type = PLOOP_CMD_SET_PUSH_BACKUP;
> +	cmd.ploop = ploop;
> +
> +	if (!ploop->pb)
> +		return -ENOENT;
> +	if (strcmp(ploop->pb->uuid, uuid))
> +		return -EINVAL;
> +
> +	WARN_ON(!ploop->maintaince);
> +
> +	/* Assign pb in work, to make it visible w/o locks (in work) */
> +	init_completion(&cmd.comp);
> +	ploop_queue_deferred_cmd(ploop, &cmd);
> +	wait_for_completion(&cmd.comp);
> +	ploop->maintaince = false;
> +	return 0;
> +}
> +
> +static int ploop_push_backup_get_uuid(struct ploop *ploop, char *result,
> +				      unsigned int maxlen)
> +{
> +	struct push_backup *pb = ploop->pb;
> +	unsigned int sz = 0;
> +
> +	if (!pb)
> +		return -EBADF;
> +
> +	DMEMIT("%s", pb->uuid);
> +	return 1;
> +}
> +
> +static int ploop_push_backup_read(struct ploop *ploop, char *uuid,
> +				char *result, unsigned int maxlen)
> +{
> +	struct dm_ploop_endio_hook *h, *orig_h;
> +	struct push_backup *pb = ploop->pb;
> +	unsigned int left, right, sz = 0;
> +	struct rb_node *node;
> +	int ret;
> +
> +	if (!pb)
> +		return -EBADF;
> +	if (strcmp(uuid, pb->uuid))
> +		return -EINVAL;
> +	if (!pb->alive)
> +		return -ESTALE;
> +
> +	spin_lock_irq(&ploop->pb_lock);
> +	ret = -ENOENT;
> +	h = orig_h = list_first_entry_or_null(&pb->pending, typeof(*h), list);
> +	if (!h)
> +		goto unlock;
> +	list_del_init(&h->list);
> +
> +	left = right = h->cluster;
> +	while ((node = rb_prev(&h->node)) != NULL) {
> +		h = rb_entry(node, struct dm_ploop_endio_hook, node);
> +		if (h->cluster + 1 != left || list_empty(&h->list))
> +			break;
> +		list_del_init(&h->list);
> +		left = h->cluster;
> +	}
> +
> +	h = orig_h;
> +	while ((node = rb_next(&h->node)) != NULL) {
> +		h = rb_entry(node, struct dm_ploop_endio_hook, node);
> +		if (h->cluster - 1 != right || list_empty(&h->list))
> +			break;
> +		list_del_init(&h->list);
> +		right = h->cluster;
> +	}
> +
> +	DMEMIT("%u:%u", left, right - left + 1);
> +	ret = 1;
> +unlock:
> +	spin_unlock_irq(&ploop->pb_lock);
> +	return ret;
> +}
> +
> +static int ploop_push_backup_write(struct ploop *ploop, char *uuid,
> +			     unsigned int cluster, unsigned int nr)
> +{
> +	unsigned int i, nr_bat_entries = ploop->nr_bat_entries;
> +	struct bio_list bio_list = BIO_EMPTY_LIST;
> +	struct push_backup *pb = ploop->pb;
> +	struct dm_ploop_endio_hook *h;
> +	bool has_more = false;
> +
> +	if (!pb)
> +		return -EBADF;
> +	if (strcmp(uuid, pb->uuid) || !nr)
> +		return -EINVAL;
> +	if (cluster >= nr_bat_entries || cluster + nr >= nr_bat_entries)
> +		return -E2BIG;
> +	if (!pb->alive)
> +		return -ESTALE;
> +
> +	spin_lock_irq(&ploop->pb_lock);
> +	for (i = cluster; i < cluster + nr; i++) {
> +		clear_bit(i, pb->ppb_map);
> +		/* TODO: optimize by introduction find_endio_hook_after() */
> +		h = find_endio_hook(ploop, &pb->rb_root, i);
> +		if (h)
> +			unlink_postponed_backup_endio(ploop, &bio_list, h);
> +	}
> +
> +	has_more = !RB_EMPTY_ROOT(&pb->rb_root);
> +	if (has_more)
> +		pb->deadline_jiffies = get_jiffies_64() + BACKUP_DEADLINE * HZ;
> +	else
> +		pb->deadline_jiffies = S64_MAX;
> +	spin_unlock_irq(&ploop->pb_lock);
> +
> +	if (!bio_list_empty(&bio_list)) {
> +		defer_bio_list(ploop, &bio_list);
> +		if (has_more)
> +			mod_timer(&pb->deadline_timer, BACKUP_DEADLINE * HZ + 1);
> +	}
> +
> +	return 0;
> +}
> +
> +/* Handle user commands requested via "message" interface */
> +void process_deferred_cmd(struct ploop *ploop, struct ploop_index_wb *piwb)
> +	__releases(&ploop->deferred_lock)
> +	__acquires(&ploop->deferred_lock)
> +{
> +	struct ploop_cmd *cmd = ploop->deferred_cmd;
> +
> +	if (likely(!cmd))
> +		return;
> +
> +	ploop->deferred_cmd = NULL;
> +	spin_unlock_irq(&ploop->deferred_lock);
> +
> +	/* There must not be a pending index wb */
> +	WARN_ON(piwb->page_nr != PAGE_NR_NONE);
> +
> +	if (cmd->type == PLOOP_CMD_RESIZE) {
> +		process_resize_cmd(ploop, piwb, cmd);
> +	} else if (cmd->type == PLOOP_CMD_ADD_DELTA) {
> +		process_add_delta_cmd(ploop, cmd);
> +	} else if (cmd->type == PLOOP_CMD_MERGE_SNAPSHOT) {
> +		process_merge_latest_snapshot_cmd(ploop, cmd);
> +	} else if (cmd->type == PLOOP_CMD_NOTIFY_DELTA_MERGED) {
> +		process_notify_delta_merged(ploop, cmd);
> +	} else if (cmd->type == PLOOP_CMD_SWITCH_TOP_DELTA) {
> +		process_switch_top_delta(ploop, cmd);
> +	} else if (cmd->type == PLOOP_CMD_UPDATE_DELTA_INDEX) {
> +		process_update_delta_index(ploop, cmd);
> +	} else if (cmd->type == PLOOP_CMD_TRACKING_START) {
> +		process_tracking_start(ploop, cmd);
> +	} else if (cmd->type == PLOOP_CMD_FLIP_UPPER_DELTAS) {
> +		process_flip_upper_deltas(ploop, cmd);
> +	} else if (cmd->type == PLOOP_CMD_SET_PUSH_BACKUP) {
> +		process_set_push_backup(ploop, cmd);
> +	} else {
> +		cmd->retval = -EINVAL;
> +		complete(&cmd->comp);
> +	}
> +	spin_lock_irq(&ploop->deferred_lock);
> +}
> +
> +int ploop_message(struct dm_target *ti, unsigned int argc, char **argv,
> +		  char *result, unsigned int maxlen)
> +{
> +	struct ploop *ploop = ti->private;
> +	bool forward = true;
> +	int ret = -EPERM;
> +	u64 val, val2;
> +
> +	if (!capable(CAP_SYS_ADMIN))
> +		goto out;
> +
> +	ret = -EINVAL;
> +	if (argc < 1)
> +		goto out;
> +
> +	mutex_lock(&ploop->ctl_mutex);
> +	if (!strcmp(argv[0], "resize")) {
> +		if (argc != 2 || kstrtou64(argv[1], 10, &val) < 0)
> +			goto unlock;
> +		ret = ploop_resize(ploop, val);
> +	} else if (!strcmp(argv[0], "add_delta")) {
> +		if (argc != 2)
> +			goto unlock;
> +		ret = ploop_add_delta(ploop, argv[1]);
> +	} else if (!strcmp(argv[0], "merge")) {
> +		if (argc == 1)
> +			ret = ploop_merge_latest_snapshot(ploop);
> +	} else if (!strncmp(argv[0], "notify_merged_", 14)) {
> +		if (!strcmp(&argv[0][14], "backward"))
> +			forward = false;
> +		else if (strcmp(&argv[0][14], "forward"))
> +			goto unlock;
> +		if (argc != 2 || kstrtou64(argv[1], 10, &val) < 0)
> +			goto unlock;
> +		ret = ploop_notify_merged(ploop, val, forward);
> +	} else if (!strcmp(argv[0], "get_delta_name")) {
> +		if (argc != 2 || kstrtou64(argv[1], 10, &val) < 0)
> +			goto unlock;
> +		ret = ploop_get_delta_name_cmd(ploop, (u8)val, result, maxlen);
> +	} else if (!strcmp(argv[0], "update_delta_index")) {
> +		if (argc != 3 || kstrtou64(argv[1], 10, &val) < 0)
> +			goto unlock;
> +		ret = ploop_update_delta_index(ploop, val, argv[2]);
> +	} else if (!strcmp(argv[0], "snapshot")) {
> +		if (argc != 3 || kstrtou64(argv[1], 10, &val) < 0)
> +			goto unlock;
> +		ret = ploop_switch_top_delta(ploop, val, argv[2]);
> +	} else if (!strncmp(argv[0], "tracking_", 9)) {
> +		if (argc != 1)
> +			goto unlock;
> +		ret = ploop_tracking_cmd(ploop, argv[0] + 9, result, maxlen);
> +	} else if (!strcmp(argv[0], "set_noresume")) {
> +		if (argc != 2)
> +			goto unlock;
> +		ret = ploop_set_noresume(ploop, argv[1]);
> +	} else if (!strcmp(argv[0], "flip_upper_deltas")) {
> +		if (argc != 3)
> +			goto unlock;
> +		ret = ploop_flip_upper_deltas(ploop, argv[1], argv[2]);
> +	} else if (!strcmp(argv[0], "push_backup_start")) {
> +		if (argc != 3 || kstrtou64(argv[2], 10, &val) < 0)
> +			goto unlock;
> +		ret = ploop_push_backup_start(ploop, argv[1], (void *)val);
> +	} else if (!strcmp(argv[0], "push_backup_stop")) {
> +		if (argc != 2)
> +			goto unlock;
> +		ret = ploop_push_backup_stop(ploop, argv[1]);
> +	} else if (!strcmp(argv[0], "push_backup_get_uuid")) {
> +		if (argc != 1)
> +			goto unlock;
> +		ret = ploop_push_backup_get_uuid(ploop, result, maxlen);
> +	} else if (!strcmp(argv[0], "push_backup_read")) {
> +		if (argc != 2)
> +			goto unlock;
> +		ret = ploop_push_backup_read(ploop, argv[1], result, maxlen);
> +	} else if (!strcmp(argv[0], "push_backup_write")) {
> +		if (argc != 3 || sscanf(argv[2], "%llu:%llu", &val, &val2) != 2)
> +			goto unlock;
> +		ret = ploop_push_backup_write(ploop, argv[1], val, val2);
> +	} else {
> +		ret = -ENOTSUPP;
> +	}
> +
> +unlock:
> +	mutex_unlock(&ploop->ctl_mutex);
> +out:
> +	return ret;
> +}
> diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
> new file mode 100644
> index 000000000000..faa334e4d8a1
> --- /dev/null
> +++ b/drivers/md/dm-ploop-map.c
> @@ -0,0 +1,1719 @@
> +#include <linux/buffer_head.h>
> +#include <linux/dm-io.h>
> +#include <linux/dm-kcopyd.h>
> +#include <linux/init.h>
> +#include <linux/vmalloc.h>
> +#include <linux/uio.h>
> +#include "dm-ploop.h"
> +
> +/*
> + * The idea of this driver is that the most part of time it does nothing:
> + * ploop_map() just replaces bio->bi_iter.bi_sector with the cluster value
> + * referred in bat_entries[]. No kwork is involved, all the work becomes
> + * delegated to backed device (loop). Kwork starts only when a bio aims
> + * to a not present cluster or for service requests.
> + *
> + * Service operations are also made from kwork, so sometimes we may avoid
> + * synchronization because of this. Two different service operations can't
> + * be executed in parallel.
> + *
> + * Discard begins from switching ploop in a special mode, when all requests
> + * are managed by kwork, while all not-exclusive bios (e.g., READ or simple
> + * WRITE) are linked to inflight_bios_rbtree. Discard bios are linked into
> + * exclusive_bios_rbtree, but their start is delayed till all not-exclusive
> + * bios going into the same cluster are finished. After exclusive bio is
> + * started, the corresponding cluster becomes "locked", and all further bios
> + * going into the same cluster becomes delayed.
> + * Since the swithing into the mode is expensive, ploop remains in the mode
> + * for CLEANUP_DELAY seconds in a hope that a new discard bio will come.
> + * After this interval the device returns into normal mode, and ordinary bios
> + * become handled in ploop_map() as before.
> + */
> +
> +#define DM_MSG_PREFIX "ploop"
> +
> +#define ploop_bat_lock(ploop, exclusive, flags)					\
> +	do {									\
> +		if (exclusive)							\
> +			write_lock_irqsave(&ploop->bat_rwlock, flags);		\
> +		else								\
> +			read_lock_irqsave(&ploop->bat_rwlock, flags);		\
> +	} while (0)
> +
> +#define ploop_bat_unlock(ploop, exclusive, flags)				\
> +	do {									\
> +		if (exclusive)							\
> +			write_unlock_irqrestore(&ploop->bat_rwlock, flags);	\
> +		else								\
> +			read_unlock_irqrestore(&ploop->bat_rwlock, flags);	\
> +	} while (0)
> +
> +/* Delta COW private */
> +struct ploop_cow {
> +	struct ploop *ploop;
> +	struct bio *cluster_bio;
> +	unsigned int dst_cluster;
> +
> +	struct dm_ploop_endio_hook hook;
> +
> +	void (*end_fn)(struct ploop *, int, void *);
> +	void *data;
> +};
> +
> +static void ploop_index_wb_init(struct ploop_index_wb *piwb, struct ploop *ploop)
> +{
> +	piwb->ploop = ploop;
> +	init_completion(&piwb->comp);
> +	spin_lock_init(&piwb->lock);
> +	piwb->bat_page = NULL;
> +	piwb->bat_bio = NULL;
> +	piwb->bi_status = 0;
> +	bio_list_init(&piwb->ready_data_bios);
> +	bio_list_init(&piwb->cow_list);
> +	/* For ploop_bat_write_complete() */
> +	atomic_set(&piwb->count, 1);
> +	piwb->completed = false;
> +	piwb->page_nr = PAGE_NR_NONE;
> +	piwb->type = PIWB_TYPE_ALLOC;
> +}
> +
> +static struct dm_ploop_endio_hook *bio_to_endio_hook(struct bio *bio)
> +{
> +	return dm_per_bio_data(bio, sizeof(struct dm_ploop_endio_hook));
> +}
> +
> +static void __ploop_init_end_io(struct ploop *ploop,
> +				struct dm_ploop_endio_hook *h)
> +{
> +	h->action = PLOOP_END_IO_NONE;
> +	h->ref_index = PLOOP_REF_INDEX_INVALID;
> +	h->piwb = NULL;
> +	memset(&h->list, 0, sizeof(h->list));
> +	h->endio_bio_list = NULL;
> +	/* FIXME: assign real cluster? */
> +	h->cluster = UINT_MAX;
> +	RB_CLEAR_NODE(&h->node);
> +}
> +
> +static void ploop_init_end_io(struct ploop *ploop, struct bio *bio)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +
> +	__ploop_init_end_io(ploop, h);
> +}
> +
> +static unsigned int bat_clu_to_page_nr(unsigned int cluster)
> +{
> +	unsigned int byte;
> +
> +	byte = (cluster + PLOOP_MAP_OFFSET) * sizeof(map_index_t);
> +	return byte >> PAGE_SHIFT;
> +}
> +
> +/* Get cluster related to bio sectors */
> +static int ploop_bio_cluster(struct ploop *ploop, struct bio *bio,
> +			     unsigned int *ret_cluster)
> +{
> +	sector_t sector = bio->bi_iter.bi_sector;
> +	unsigned int cluster, end_cluster;
> +	loff_t end_byte;
> +
> +	cluster = sector >> ploop->cluster_log;
> +	end_byte = ((sector << 9) + bio->bi_iter.bi_size - 1);
> +	end_cluster = end_byte >> (ploop->cluster_log + 9);
> +
> +	if (unlikely(cluster > ploop->nr_bat_entries) ||
> +		     cluster != end_cluster) {
> +		/*
> +		 * This mustn't happen, since we set max_io_len
> +		 * via dm_set_target_max_io_len().
> +		 */
> +		WARN_ONCE(1, "sec=%lu, size=%u, clu=%u, end=%u, nr=%u\n",
> +			  sector, bio->bi_iter.bi_size, cluster,
> +			  end_cluster, ploop->nr_bat_entries);
> +		return -EINVAL;
> +	}
> +
> +	*ret_cluster = cluster;
> +	return 0;
> +}
> +
> +void defer_bio(struct ploop *ploop, struct bio *bio)
> +{
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	bio_list_add(&ploop->deferred_bios, bio);
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +
> +	queue_work(ploop->wq, &ploop->worker);
> +}
> +
> +void defer_bio_list(struct ploop *ploop, struct bio_list *bio_list)
> +{
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	bio_list_merge(&ploop->deferred_bios, bio_list);
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +	queue_work(ploop->wq, &ploop->worker);
> +}
> +
> +/*
> + * Userspace calls dm_suspend() to get changed blocks finally.
> + * dm_suspend() waits for dm's inflight bios, so this function
> + * must be called after @bio is written and before @bio is ended.
> + * The only possible exception is writes driven by "message" ioctl.
> + * Thus, userspace mustn't do maintaince operations in parallel
> + * with tracking.
> + */
> +void __track_bio(struct ploop *ploop, struct bio *bio)
> +{
> +	unsigned int dst_cluster = bio->bi_iter.bi_sector >> ploop->cluster_log;
> +	unsigned long flags;
> +
> +	if (!op_is_write(bio->bi_opf) || !bio_sectors(bio))
> +		return;
> +
> +	WARN_ON_ONCE(bio->bi_disk != ploop->origin_dev->bdev->bd_disk);
> +
> +	read_lock_irqsave(&ploop->bat_rwlock, flags);
> +	if (ploop->tracking_bitmap && !WARN_ON(dst_cluster >= ploop->tb_nr))
> +		set_bit(dst_cluster, ploop->tracking_bitmap);
> +	read_unlock_irqrestore(&ploop->bat_rwlock, flags);
> +}
> +
> +static void queue_discard_index_wb(struct ploop *ploop, struct bio *bio)
> +{
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	bio_list_add(&ploop->discard_bios, bio);
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +
> +	queue_work(ploop->wq, &ploop->worker);
> +}
> +
> +/* This 1)defers looking suitable discard bios and 2)ends the rest of them. */
> +static int ploop_map_discard(struct ploop *ploop, struct bio *bio)
> +{
> +	bool supported = false;
> +	unsigned int cluster;
> +	unsigned long flags;
> +
> +	/* Only whole cluster in no-snapshots case can be discarded. */
> +	if (whole_cluster(ploop, bio)) {
> +		cluster = bio->bi_iter.bi_sector >> ploop->cluster_log;
> +		read_lock_irqsave(&ploop->bat_rwlock, flags);
> +		/* Early checks to not wake up work for nought. */
> +		if (cluster_is_in_top_delta(ploop, cluster) &&
> +		    !ploop->nr_deltas)
> +			supported = true;
> +		read_unlock_irqrestore(&ploop->bat_rwlock, flags);
> +	}
> +
> +	if (supported) {
> +		defer_bio(ploop, bio);
> +	} else {
> +		bio->bi_status = BLK_STS_NOTSUPP;
> +		bio_endio(bio);
> +	}
> +
> +	return DM_MAPIO_SUBMITTED;
> +}
> +
> +struct dm_ploop_endio_hook *find_endio_hook(struct ploop *ploop,
> +					    struct rb_root *root,
> +					    unsigned int cluster)
> +{
> +	struct rb_node *node = root->rb_node;
> +	struct dm_ploop_endio_hook *h;
> +
> +	while (node) {
> +		h = rb_entry(node, struct dm_ploop_endio_hook, node);
> +		if (cluster < h->cluster)
> +			node = node->rb_left;
> +		else if (cluster > h->cluster)
> +			node = node->rb_right;
> +		else
> +			return h;
> +	}
> +
> +	return NULL;
> +}
> +
> +static struct dm_ploop_endio_hook *find_inflight_bio(struct ploop *ploop,
> +						     unsigned int cluster)
> +{
> +	lockdep_assert_held(&ploop->deferred_lock);
> +	return find_endio_hook(ploop, &ploop->inflight_bios_rbtree, cluster);
> +}
> +
> +struct dm_ploop_endio_hook *find_lk_of_cluster(struct ploop *ploop,
> +					       unsigned int cluster)
> +{
> +	lockdep_assert_held(&ploop->deferred_lock);
> +	return find_endio_hook(ploop, &ploop->exclusive_bios_rbtree, cluster);
> +}
> +
> +static void add_endio_bio(struct dm_ploop_endio_hook *h, struct bio *later_bio)
> +{
> +	later_bio->bi_next = h->endio_bio_list;
> +	h->endio_bio_list = later_bio;
> +}
> +
> +static void inc_nr_inflight_raw(struct ploop *ploop,
> +				struct dm_ploop_endio_hook *h)
> +{
> +	unsigned char ref_index = ploop->inflight_bios_ref_index;
> +
> +	if (!WARN_ON_ONCE(h->ref_index != PLOOP_REF_INDEX_INVALID)) {
> +		percpu_ref_get(&ploop->inflight_bios_ref[ref_index]);
> +		h->ref_index = ref_index;
> +	}
> +}
> +
> +static void inc_nr_inflight(struct ploop *ploop, struct bio *bio)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +
> +	inc_nr_inflight_raw(ploop, h);
> +}
> +
> +/*
> + * Note, that do_ploop_work() waits final ref dec_nr_inflight()
> + * (e.g., on grow), so the code decrementing the counter can't
> + * depend on the work or some actions it makes.
> + *
> + * The only intended usecase is that the counter is decremented
> + * from endio of bios submitted to underlined device (loop) or
> + * from ki_complete of requests submitted to delta files
> + * (while increment occurs just right before the submitting).
> + */
> +static void dec_nr_inflight_raw(struct ploop *ploop,
> +				struct dm_ploop_endio_hook *h)
> +{
> +	if (h->ref_index != PLOOP_REF_INDEX_INVALID) {
> +		percpu_ref_put(&ploop->inflight_bios_ref[h->ref_index]);
> +		h->ref_index = PLOOP_REF_INDEX_INVALID;
> +	}
> +}
> +
> +static void dec_nr_inflight(struct ploop *ploop, struct bio *bio)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +
> +	dec_nr_inflight_raw(ploop, h);
> +}
> +
> +static void link_endio_hook(struct ploop *ploop, struct dm_ploop_endio_hook *new,
> +		      struct rb_root *root, unsigned int cluster, bool exclusive)
> +{
> +	struct rb_node *parent, **node = &root->rb_node;
> +	struct dm_ploop_endio_hook *h;
> +
> +	BUG_ON(!RB_EMPTY_NODE(&new->node));
> +	parent = NULL;
> +
> +	while (*node) {
> +		h = rb_entry(*node, struct dm_ploop_endio_hook, node);
> +		parent = *node;
> +		if (cluster < h->cluster)
> +			node = &parent->rb_left;
> +		else if (cluster > h->cluster)
> +			node = &parent->rb_right;
> +		else {
> +			if (exclusive)
> +				BUG();
> +			if (new < h)
> +				node = &parent->rb_left;
> +			else if (new > h)
> +				node = &parent->rb_right;
> +			else
> +				BUG();
> +		}
> +	}
> +
> +	new->cluster = cluster;
> +	rb_link_node(&new->node, parent, node);
> +	rb_insert_color(&new->node, root);
> +}
> +
> +/*
> + * Removes endio hook of completed bio either from inflight_bios_rbtree
> + * or from exclusive_bios_rbtree. BIOs from endio_bio_list are requeued
> + * to deferred_list.
> + */
> +static void unlink_endio_hook(struct ploop *ploop, struct rb_root *root,
> +		struct dm_ploop_endio_hook *h, struct bio_list *bio_list)
> +{
> +	struct bio *iter;
> +
> +	BUG_ON(RB_EMPTY_NODE(&h->node));
> +
> +	rb_erase(&h->node, root);
> +	RB_CLEAR_NODE(&h->node);
> +	while ((iter = h->endio_bio_list) != NULL) {
> +		h->endio_bio_list = iter->bi_next;
> +		iter->bi_next = NULL;
> +		bio_list_add(bio_list, iter);
> +	}
> +}
> +
> +static void add_cluster_lk(struct ploop *ploop, struct dm_ploop_endio_hook *h,
> +			   unsigned int cluster)
> +{
> +	unsigned long flags;
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	link_endio_hook(ploop, h, &ploop->exclusive_bios_rbtree, cluster, true);
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +}
> +static void del_cluster_lk(struct ploop *ploop, struct dm_ploop_endio_hook *h)
> +{
> +	struct bio_list bio_list = BIO_EMPTY_LIST;
> +	unsigned long flags;
> +	bool queue = false;
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	unlink_endio_hook(ploop, &ploop->exclusive_bios_rbtree, h, &bio_list);
> +	if (!bio_list_empty(&bio_list)) {
> +		bio_list_merge(&ploop->deferred_bios, &bio_list);
> +		queue = true;
> +	}
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +
> +	if (queue)
> +		queue_work(ploop->wq, &ploop->worker);
> +
> +}
> +
> +static void maybe_link_submitting_bio(struct ploop *ploop, struct bio *bio,
> +				      unsigned int cluster)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +	unsigned long flags;
> +
> +	if (!ploop->force_link_inflight_bios)
> +		return;
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	link_endio_hook(ploop, h, &ploop->inflight_bios_rbtree, cluster, false);
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +}
> +static void maybe_unlink_completed_bio(struct ploop *ploop, struct bio *bio)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +	struct bio_list bio_list = BIO_EMPTY_LIST;
> +	unsigned long flags;
> +	bool queue = false;
> +
> +	if (likely(RB_EMPTY_NODE(&h->node)))
> +		return;
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	unlink_endio_hook(ploop, &ploop->inflight_bios_rbtree, h, &bio_list);
> +	if (!bio_list_empty(&bio_list)) {
> +		bio_list_merge(&ploop->deferred_bios, &bio_list);
> +		queue = true;
> +	}
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +
> +	if (queue)
> +		queue_work(ploop->wq, &ploop->worker);
> +}
> +
> +static void handle_discard_bio(struct ploop *ploop, struct bio *bio,
> +		     unsigned int cluster, unsigned int dst_cluster)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +	struct dm_ploop_endio_hook *inflight_h;
> +	unsigned long flags;
> +
> +	if (!cluster_is_in_top_delta(ploop, cluster) || ploop->nr_deltas) {
> +		bio->bi_status = BLK_STS_NOTSUPP;
> +		bio_endio(bio);
> +		return;
> +	}
> +
> +	if (!ploop->force_link_inflight_bios) {
> +		/*
> +		 * Switch ploop to mode, when requests are handled
> +		 * from kwork only, and force all not exclusive
> +		 * inflight bios to link into inflight_bios_rbtree.
> +		 * Note, that this does not wait completion of
> +		 * two-stages requests (currently, these may be only
> +		 * cow, which take cluster lk, so we are safe with
> +		 * them).
> +		 */
> +		ploop->force_link_inflight_bios = true;
> +		force_defer_bio_count_inc(ploop);
> +		ploop_inflight_bios_ref_switch(ploop);
> +	}
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	inflight_h = find_inflight_bio(ploop, cluster);
> +	if (inflight_h)
> +		add_endio_bio(inflight_h, bio);
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +
> +	if (inflight_h) {
> +		/* @bio will be requeued on inflight_h's bio end */
> +		pr_err_once("ploop: delayed discard: device is used as raw?\n");
> +		return;
> +	}
> +
> +	h->action = PLOOP_END_IO_DISCARD_BIO;
> +	add_cluster_lk(ploop, h, cluster);
> +
> +	read_lock_irq(&ploop->bat_rwlock);
> +	inc_nr_inflight(ploop, bio);
> +	read_unlock_irq(&ploop->bat_rwlock);
> +	atomic_inc(&ploop->nr_discard_bios);
> +
> +	remap_to_cluster(ploop, bio, dst_cluster);
> +	remap_to_origin(ploop, bio);
> +	generic_make_request(bio);
> +}
> +
> +static int ploop_discard_bio_end(struct ploop *ploop, struct bio *bio)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +
> +	dec_nr_inflight(ploop, bio);
> +	if (bio->bi_status == BLK_STS_OK)
> +		queue_discard_index_wb(ploop, bio);
> +	else
> +		h->action = PLOOP_END_IO_DISCARD_INDEX_BIO;
> +	return DM_ENDIO_INCOMPLETE;
> +}
> +
> +static int ploop_discard_index_bio_end(struct ploop *ploop, struct bio *bio)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +
> +	del_cluster_lk(ploop, h);
> +
> +	WRITE_ONCE(ploop->pending_discard_cleanup, jiffies);
> +	/* Pairs with barrier in do_discard_cleanup() */
> +	smp_mb__before_atomic();
> +	atomic_dec(&ploop->nr_discard_bios);
> +	return DM_ENDIO_DONE;
> +}
> +
> +static void complete_cow(struct ploop_cow *cow, blk_status_t bi_status)
> +{
> +	unsigned int dst_cluster = cow->dst_cluster;
> +	struct bio *cluster_bio = cow->cluster_bio;
> +	struct ploop *ploop = cow->ploop;
> +	struct dm_ploop_endio_hook *h;
> +	unsigned long flags;
> +
> +	WARN_ON_ONCE(cluster_bio->bi_next);
> +	h = &cow->hook;
> +
> +	del_cluster_lk(ploop, h);
> +
> +	if (dst_cluster != BAT_ENTRY_NONE && bi_status != BLK_STS_OK) {
> +		read_lock_irqsave(&ploop->bat_rwlock, flags);
> +		ploop_hole_set_bit(dst_cluster, ploop);
> +		read_unlock_irqrestore(&ploop->bat_rwlock, flags);
> +	}
> +
> +	if (cow->end_fn)
> +		cow->end_fn(ploop, blk_status_to_errno(bi_status), cow->data);
> +
> +	queue_work(ploop->wq, &ploop->worker);
> +	free_bio_with_pages(ploop, cow->cluster_bio);
> +	kfree(cow);
> +}
> +
> +static void piwb_discard_completed(struct ploop *ploop, bool success,
> +		  unsigned int cluster, unsigned int new_dst_cluster)
> +{
> +	unsigned int dst_cluster;
> +
> +	if (new_dst_cluster)
> +		return;
> +
> +	if (cluster_is_in_top_delta(ploop, cluster)) {
> +		WARN_ON_ONCE(ploop->nr_deltas);
> +		if (success) {
> +			dst_cluster = ploop->bat_entries[cluster];
> +			ploop->bat_entries[cluster] = BAT_ENTRY_NONE;
> +			ploop->bat_levels[cluster] = 0;
> +			ploop_hole_set_bit(dst_cluster, ploop);
> +		}
> +	}
> +}
> +
> +/*
> + * Update local BAT copy with written indexes on success.
> + * Mark allocate clusters as holes on failure.
> + * FIXME: a failure may mean some sectors are written, so
> + * we have to reread BAT page to check that.
> + */
> +static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
> +					     struct ploop_index_wb *piwb,
> +					     bool success)
> +{
> +	map_index_t *dst_cluster, off;
> +	unsigned int i, last;
> +	unsigned long flags;
> +
> +	/* Absolute number of first index in page (negative for page#0) */
> +	off = piwb->page_nr * PAGE_SIZE / sizeof(map_index_t);
> +	off -= PLOOP_MAP_OFFSET;
> +
> +	/* Last and first index in copied page */
> +	last = ploop->nr_bat_entries - off;
> +	if (last > PAGE_SIZE / sizeof(map_index_t))
> +		last = PAGE_SIZE / sizeof(map_index_t);
> +	i = 0;
> +	if (!piwb->page_nr)
> +		i = PLOOP_MAP_OFFSET;
> +
> +	dst_cluster = kmap_atomic(piwb->bat_page);
> +	ploop_bat_lock(ploop, success, flags);
> +
> +	for (; i < last; i++) {
> +		if (piwb->type == PIWB_TYPE_DISCARD) {
> +			piwb_discard_completed(ploop, success, i + off, dst_cluster[i]);
> +			continue;
> +		}
> +
> +		if (!dst_cluster[i])
> +			continue;
> +
> +		if (cluster_is_in_top_delta(ploop, i + off) && piwb->type == PIWB_TYPE_ALLOC) {
> +			WARN_ON(ploop->bat_entries[i + off] != dst_cluster[i]);
> +			continue;
> +		}
> +
> +		if (success) {
> +			ploop->bat_entries[i + off] = dst_cluster[i];
> +			ploop->bat_levels[i + off] = BAT_LEVEL_TOP;
> +		} else {
> +			/*
> +			 * Despite set_bit() is atomic, we take read_lock()
> +			 * to access ploop->bat_entries[] above (really it's
> +			 * not need, since new wb to this page can't start
> +			 * before this wb is ended).
> +			 */
> +			ploop_hole_set_bit(i + off, ploop);
> +		}
> +	}
> +
> +	ploop_bat_unlock(ploop, success, flags);
> +	kunmap_atomic(dst_cluster);
> +}
> +
> +static void put_piwb(struct ploop_index_wb *piwb)
> +{
> +	if (atomic_dec_and_test(&piwb->count)) {
> +		struct ploop *ploop = piwb->ploop;
> +		/*
> +		 * Index wb failed. Mark clusters as unallocated again.
> +		 * piwb->count is zero, so all data writers compeleted.
> +		 */
> +		if (piwb->bi_status)
> +			ploop_advance_local_after_bat_wb(ploop, piwb, false);
> +
> +		complete(&piwb->comp);
> +	}
> +}
> +
> +/* This handler is called after BAT is updated. */
> +static void ploop_bat_write_complete(struct bio *bio)
> +{
> +	struct ploop_index_wb *piwb = bio->bi_private;
> +	struct bio *data_bio, *cluster_bio;
> +	struct ploop *ploop = piwb->ploop;
> +	struct ploop_cow *cow;
> +	unsigned long flags;
> +
> +	track_bio(ploop, bio);
> +
> +	spin_lock_irqsave(&piwb->lock, flags);
> +	piwb->completed = true;
> +	piwb->bi_status = bio->bi_status;
> +	spin_unlock_irqrestore(&piwb->lock, flags);
> +
> +	/*
> +	 * End pending data bios. Unlocked, as nobody can
> +	 * add a new element after piwc->completed is true.
> +	 */
> +	while ((data_bio = bio_list_pop(&piwb->ready_data_bios))) {
> +		if (bio->bi_status)
> +			data_bio->bi_status = bio->bi_status;
> +		if (data_bio->bi_end_io)
> +			data_bio->bi_end_io(data_bio);
> +	}
> +
> +	while ((cluster_bio = bio_list_pop(&piwb->cow_list))) {
> +		cow = cluster_bio->bi_private;
> +		complete_cow(cow, bio->bi_status);
> +	}
> +
> +	if (!piwb->bi_status) {
> +		/*
> +		 * Success: now update local BAT copy. We could do this
> +		 * from our delayed work, but we want to publish new
> +		 * mapping in the fastest way.
> +		 */
> +		ploop_advance_local_after_bat_wb(ploop, piwb, true);
> +	}
> +
> +	/*
> +	 * In case of update BAT is failed, dst_clusters will be
> +	 * set back to holes_bitmap on last put_piwb().
> +	 */
> +	put_piwb(piwb);
> +}
> +
> +static int ploop_prepare_bat_update(struct ploop *ploop, unsigned int page_nr,
> +				    struct ploop_index_wb *piwb)
> +{
> +	unsigned int i, off, last;
> +	struct page *page;
> +	struct bio *bio;
> +	map_index_t *to;
> +	sector_t sector;
> +
> +	piwb->bat_page = page = alloc_page(GFP_NOIO);
> +	if (!page)
> +		return -ENOMEM;
> +	piwb->bat_bio = bio = bio_alloc(GFP_NOIO, 1);
> +	if (!bio) {
> +		put_page(page);
> +		piwb->bat_page = NULL;
> +		return -ENOMEM;
> +	}
> +
> +	piwb->page_nr = page_nr;
> +	to = kmap_atomic(page);
> +	memset((void *)to, 0, PAGE_SIZE);
> +
> +	/* Absolute number of first index in page (negative for page#0) */
> +	off = page_nr * PAGE_SIZE / sizeof(map_index_t);
> +	off -= PLOOP_MAP_OFFSET;
> +
> +	/* Last and first index in copied page */
> +	last = ploop->nr_bat_entries - off;
> +	if (last > PAGE_SIZE / sizeof(map_index_t))
> +		last = PAGE_SIZE / sizeof(map_index_t);
> +	i = 0;
> +	if (!page_nr) {
> +		i = PLOOP_MAP_OFFSET;
> +		memcpy(to, ploop->hdr, sizeof(*ploop->hdr));
> +	}
> +
> +	/* Copy BAT (BAT goes right after hdr, see .ctr) */
> +	for (; i < last; i++) {
> +		if (!cluster_is_in_top_delta(ploop, i + off))
> +			continue;
> +		to[i] = ploop->bat_entries[i + off];
> +	}
> +
> +	kunmap_atomic(to);
> +
> +	sector = (page_nr * PAGE_SIZE) >> SECTOR_SHIFT;
> +	bio->bi_iter.bi_sector = sector;
> +	remap_to_origin(ploop, bio);
> +
> +	bio->bi_private = piwb;
> +	bio->bi_end_io = ploop_bat_write_complete;
> +	bio_set_op_attrs(bio, REQ_OP_WRITE, REQ_SYNC | REQ_FUA | REQ_PREFLUSH);
> +	bio_add_page(bio, page, PAGE_SIZE, 0);
> +
> +	return 0;
> +}
> +
> +void ploop_reset_bat_update(struct ploop_index_wb *piwb)
> +{
> +	struct ploop *ploop = piwb->ploop;
> +
> +	put_page(piwb->bat_page);
> +	bio_put(piwb->bat_bio);
> +	ploop_index_wb_init(piwb, ploop);
> +}
> +
> +static void ploop_bat_page_zero_cluster(struct ploop *ploop,
> +					struct ploop_index_wb *piwb,
> +					unsigned int cluster)
> +{
> +	map_index_t *to;
> +
> +	/* Cluster index related to the page[page_nr] start */
> +	cluster -= piwb->page_nr * PAGE_SIZE / sizeof(map_index_t) - PLOOP_MAP_OFFSET;
> +
> +	to = kmap_atomic(piwb->bat_page);
> +	to[cluster] = 0;
> +	kunmap_atomic(to);
> +}
> +
> +/*
> + * This finds a free dst_cluster on origin device, and reflects this
> + * in ploop->holes_bitmap and bat_page.
> + */
> +static int ploop_alloc_cluster(struct ploop *ploop, struct ploop_index_wb *piwb,
> +			       unsigned int cluster, unsigned int *dst_cluster)
> +{
> +	struct page *page = piwb->bat_page;
> +	map_index_t *to;
> +	int ret = 0;
> +
> +	/* Cluster index related to the page[page_nr] start */
> +	cluster -= piwb->page_nr * PAGE_SIZE / sizeof(map_index_t) - PLOOP_MAP_OFFSET;
> +
> +	to = kmap_atomic(page);
> +	if (to[cluster]) {
> +		/* Already mapped by one of previous bios */
> +		*dst_cluster = to[cluster];
> +		goto unmap;
> +	}
> +
> +	/* Find empty cluster */
> +	*dst_cluster = find_first_bit(ploop->holes_bitmap, ploop->hb_nr);
> +	if (*dst_cluster >= ploop->hb_nr) {
> +		ret = -EIO;
> +		goto unmap;
> +	}
> +
> +	/*
> +	 * Mark cluster as used. Find & clear bit is unlocked,
> +	 * since currently this may be called only from deferred
> +	 * kwork. Note, that set_bit may be made from many places.
> +	 */
> +	ploop_hole_clear_bit(*dst_cluster, ploop);
> +
> +	to[cluster] = *dst_cluster;
> +unmap:
> +	kunmap_atomic(to);
> +	return ret;
> +}
> +
> +
> +static int ploop_data_bio_end(struct bio *bio)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +	struct ploop_index_wb *piwb = h->piwb;
> +	unsigned long flags;
> +	bool completed;
> +
> +	spin_lock_irqsave(&piwb->lock, flags);
> +	completed = piwb->completed;
> +	if (!completed)
> +		bio_list_add(&piwb->ready_data_bios, bio);
> +	else if (!bio->bi_status)
> +		bio->bi_status = piwb->bi_status;
> +	spin_unlock_irqrestore(&piwb->lock, flags);
> +
> +	dec_nr_inflight(piwb->ploop, bio);
> +
> +	if (!completed)
> +		return DM_ENDIO_INCOMPLETE;
> +
> +	put_piwb(piwb);
> +	return DM_ENDIO_DONE;
> +}
> +
> +static bool ploop_attach_end_action(struct bio *bio, struct ploop_index_wb *piwb)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +
> +	if (WARN_ON_ONCE(h->action != PLOOP_END_IO_NONE)) {
> +		h->action = PLOOP_END_IO_NONE;
> +		return false;
> +	}
> +
> +	/* Currently this can't fail. */
> +	if (!atomic_inc_not_zero(&piwb->count))
> +		return false;
> +
> +	h->action = PLOOP_END_IO_DATA_BIO;
> +	h->piwb = piwb;
> +
> +	return true;
> +}
> +
> +struct ploop_iocb {
> +	struct kiocb iocb;
> +	struct bio *bio;
> +	atomic_t count;
> +};
> +
> +static void ploop_read_aio_do_completion(struct ploop_iocb *piocb)
> +{
> +	struct bio *bio = piocb->bio;
> +
> +	if (!atomic_dec_and_test(&piocb->count))
> +		return;
> +	bio_endio(bio);
> +	kfree(piocb);
> +}
> +
> +static void ploop_read_aio_complete(struct kiocb *iocb, long ret, long ret2)
> +{
> +        struct ploop_iocb *piocb = container_of(iocb, struct ploop_iocb, iocb);
> +	struct bio *bio = piocb->bio;
> +
> +	if (ret != bio->bi_iter.bi_size)
> +		bio->bi_status = BLK_STS_IOERR;
> +	else
> +		bio->bi_status = BLK_STS_OK;
> +        ploop_read_aio_do_completion(piocb);
> +}
> +/*
> + * Read cluster or its part from secondary delta.
> + * @bio is dm's or plain (w/o dm_ploop_endio_hook container and ploop_endio()).
> + * Note, that nr inflight is not incremented here, so delegate this to caller
> + * (if you need).
> + */
> +static void submit_delta_read(struct ploop *ploop, unsigned int level,
> +			    unsigned int dst_cluster, struct bio *bio)
> +{
> +	struct ploop_iocb *piocb;
> +	struct bio_vec *bvec;
> +	struct iov_iter iter;
> +	unsigned int offset;
> +	struct file *file;
> +	loff_t pos;
> +	int ret;
> +
> +	piocb = kzalloc(sizeof(*piocb), GFP_NOIO); /* This may be improved */
> +	if (!piocb) {
> +		bio->bi_status = BLK_STS_RESOURCE;
> +		bio_endio(bio);
> +		return;
> +	}
> +	atomic_set(&piocb->count, 2);
> +	piocb->bio = bio;
> +
> +	remap_to_cluster(ploop, bio, dst_cluster);
> +
> +	bvec = __bvec_iter_bvec(bio->bi_io_vec, bio->bi_iter);
> +	offset = bio->bi_iter.bi_bvec_done;
> +
> +	iov_iter_bvec(&iter, READ|ITER_BVEC, bvec, 1, bio->bi_iter.bi_size);
> +	iter.iov_offset = offset;
> +
> +	pos = (bio->bi_iter.bi_sector << SECTOR_SHIFT);
> +	file = ploop->deltas[level];
> +
> +	piocb->iocb.ki_pos = pos;
> +	piocb->iocb.ki_filp = file;
> +	piocb->iocb.ki_complete = ploop_read_aio_complete;
> +	piocb->iocb.ki_flags = IOCB_DIRECT;
> +	piocb->iocb.ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
> +
> +	ret = call_read_iter(file, &piocb->iocb, &iter);
> +
> +	ploop_read_aio_do_completion(piocb);
> +
> +	if (ret != -EIOCBQUEUED)
> +		piocb->iocb.ki_complete(&piocb->iocb, ret, 0);
> +}
> +
> +static void initiate_delta_read(struct ploop *ploop, unsigned int level,
> +				unsigned int dst_cluster, struct bio *bio)
> +{
> +	if (dst_cluster == BAT_ENTRY_NONE) {
> +		/* No one delta contains dst_cluster. */
> +		zero_fill_bio(bio);
> +		bio_endio(bio);
> +		return;
> +	}
> +
> +	submit_delta_read(ploop, level, dst_cluster, bio);
> +}
> +
> +static void ploop_cow_endio(struct bio *cluster_bio)
> +{
> +	struct ploop_cow *cow = cluster_bio->bi_private;
> +	struct ploop *ploop = cow->ploop;
> +	unsigned long flags;
> +
> +	track_bio(ploop, cluster_bio);
> +
> +	spin_lock_irqsave(&ploop->deferred_lock, flags);
> +	bio_list_add(&ploop->delta_cow_action_list, cluster_bio);
> +	spin_unlock_irqrestore(&ploop->deferred_lock, flags);
> +
> +	dec_nr_inflight_raw(ploop, &cow->hook);
> +	queue_work(ploop->wq, &ploop->worker);
> +}
> +
> +static bool postpone_if_cluster_locked(struct ploop *ploop, struct bio *bio,
> +				       unsigned int cluster)
> +{
> +	struct dm_ploop_endio_hook *e_h; /* Exclusively locked */
> +
> +	spin_lock_irq(&ploop->deferred_lock);
> +	e_h = find_lk_of_cluster(ploop, cluster);
> +	if (e_h)
> +		add_endio_bio(e_h, bio);
> +	spin_unlock_irq(&ploop->deferred_lock);
> +
> +	return e_h != NULL;
> +}
> +
> +static bool postpone_if_required_for_backup(struct ploop *ploop,
> +			  struct bio *bio, unsigned int cluster)
> +{
> +	struct push_backup *pb = ploop->pb;
> +	struct dm_ploop_endio_hook *h;
> +	bool queue_timer = false;
> +
> +	if (likely(!pb || !pb->alive))
> +		return false;
> +	if (!op_is_write(bio->bi_opf))
> +		return false;
> +	if (!test_bit(cluster, pb->ppb_map))
> +		return false;
> +	spin_lock_irq(&ploop->pb_lock);
> +	if (!test_bit(cluster, pb->ppb_map)) {
> +		spin_unlock_irq(&ploop->pb_lock);
> +		return false;
> +	}
> +
> +	h = find_endio_hook(ploop, &pb->rb_root, cluster);
> +	if (h) {
> +		add_endio_bio(h, bio);
> +		spin_unlock_irq(&ploop->pb_lock);
> +		return true;
> +	}
> +
> +	if (RB_EMPTY_ROOT(&pb->rb_root)) {
> +		pb->deadline_jiffies = get_jiffies_64() + BACKUP_DEADLINE * HZ;
> +		queue_timer = true;
> +	}
> +
> +	h = bio_to_endio_hook(bio);
> +	link_endio_hook(ploop, h, &pb->rb_root, cluster, true);
> +	list_add_tail(&h->list, &pb->pending);
> +	spin_unlock_irq(&ploop->pb_lock);
> +
> +	if (queue_timer)
> +		mod_timer(&pb->deadline_timer, BACKUP_DEADLINE * HZ + 1);
> +
> +	return true;
> +}
> +
> +int submit_cluster_cow(struct ploop *ploop, unsigned int level,
> +		       unsigned int cluster, unsigned int dst_cluster,
> +		       void (*end_fn)(struct ploop *, int, void *), void *data)
> +{
> +	struct bio *bio = NULL;
> +	struct ploop_cow *cow;
> +
> +	/* Prepare new delta read */
> +	bio = alloc_bio_with_pages(ploop);
> +	if (!bio)
> +		goto err;
> +
> +	cow = kmalloc(sizeof(*cow), GFP_NOIO);
> +	if (!cow)
> +		goto err;
> +
> +	cow->ploop = ploop;
> +	cow->dst_cluster = BAT_ENTRY_NONE;
> +	cow->cluster_bio = bio;
> +	cow->end_fn = end_fn;
> +	cow->data = data;
> +
> +	bio_prepare_offsets(ploop, bio, cluster);
> +	bio_set_op_attrs(bio, REQ_OP_READ, 0);
> +	bio->bi_end_io = ploop_cow_endio;
> +	bio->bi_private = cow;
> +
> +	__ploop_init_end_io(ploop, &cow->hook);
> +	add_cluster_lk(ploop, &cow->hook, cluster);
> +
> +	/* Stage #0: read secondary delta full cluster */
> +	submit_delta_read(ploop, level, dst_cluster, bio);
> +	return 0;
> +err:
> +	if (bio)
> +		free_bio_with_pages(ploop, bio);
> +	return -ENOMEM;
> +}
> +
> +static void queue_or_fail(struct ploop *ploop, int err, void *data)
> +{
> +	struct bio *bio = data;
> +
> +	if (err && err != BLK_STS_AGAIN) {
> +		bio->bi_status = errno_to_blk_status(err);
> +		bio_endio(bio);
> +	} else {
> +		defer_bio(ploop, bio);
> +	}
> +}
> +
> +static void initiate_cluster_cow(struct ploop *ploop, unsigned int level,
> +		unsigned int cluster, unsigned int dst_cluster, struct bio *bio)
> +{
> +	if (!submit_cluster_cow(ploop, level, cluster, dst_cluster,
> +				queue_or_fail, bio))
> +		return;
> +
> +	bio->bi_status = BLK_STS_RESOURCE;
> +	bio_endio(bio);
> +}
> +
> +static void submit_cluster_write(struct ploop_cow *cow)
> +{
> +	struct bio *bio = cow->cluster_bio;
> +	struct ploop *ploop = cow->ploop;
> +	unsigned int dst_cluster;
> +
> +	dst_cluster = find_first_bit(ploop->holes_bitmap, ploop->hb_nr);
> +	if (dst_cluster >= ploop->hb_nr)
> +		goto error;
> +	ploop_hole_clear_bit(dst_cluster, ploop);
> +	cow->dst_cluster = dst_cluster;
> +
> +	bio_reset(bio);
> +	bio_prepare_offsets(ploop, bio, dst_cluster);
> +	bio_set_op_attrs(bio, REQ_OP_WRITE, 0);
> +	remap_to_origin(ploop, bio);
> +
> +	BUG_ON(irqs_disabled());
> +	read_lock_irq(&ploop->bat_rwlock);
> +	inc_nr_inflight_raw(ploop, &cow->hook);
> +	read_unlock_irq(&ploop->bat_rwlock);
> +	bio->bi_end_io = ploop_cow_endio;
> +	bio->bi_private = cow;
> +
> +	submit_bio(bio);
> +	return;
> +error:
> +	complete_cow(cow, BLK_STS_IOERR);
> +}
> +
> +static void submit_cow_index_wb(struct ploop_cow *cow,
> +				struct ploop_index_wb *piwb)
> +{
> +	struct dm_ploop_endio_hook *h = &cow->hook;
> +	unsigned int cluster = h->cluster;
> +	struct ploop *ploop = cow->ploop;
> +	unsigned int page_nr;
> +	map_index_t *to;
> +
> +	page_nr = bat_clu_to_page_nr(cluster);
> +
> +	if (piwb->page_nr == PAGE_NR_NONE) {
> +		/* No index wb in process. Prepare a new one */
> +		if (ploop_prepare_bat_update(ploop, page_nr, piwb) < 0)
> +			goto err_resource;
> +	}
> +
> +	if (piwb->page_nr != page_nr || piwb->type != PIWB_TYPE_ALLOC) {
> +		/* Another BAT page wb is in process */
> +		spin_lock_irq(&ploop->deferred_lock);
> +		bio_list_add(&ploop->delta_cow_action_list, cow->cluster_bio);
> +		spin_unlock_irq(&ploop->deferred_lock);
> +		queue_work(ploop->wq, &ploop->worker);
> +		goto out;
> +	}
> +
> +	cluster -= page_nr * PAGE_SIZE / sizeof(map_index_t) - PLOOP_MAP_OFFSET;
> +
> +	to = kmap_atomic(piwb->bat_page);
> +	WARN_ON(to[cluster]);
> +	to[cluster] = cow->dst_cluster;
> +	kunmap_atomic(to);
> +
> +	/* Prevent double clearing of holes_bitmap bit on complete_cow() */
> +	cow->dst_cluster = BAT_ENTRY_NONE;
> +	spin_lock_irq(&ploop->deferred_lock);
> +	bio_list_add(&piwb->cow_list, cow->cluster_bio);
> +	spin_unlock_irq(&ploop->deferred_lock);
> +out:
> +	return;
> +err_resource:
> +	complete_cow(cow, BLK_STS_RESOURCE);
> +}
> +
> +static void process_delta_wb(struct ploop *ploop, struct ploop_index_wb *piwb)
> +{
> +	struct bio_list cow_list = BIO_EMPTY_LIST;
> +	struct bio *cluster_bio;
> +	struct ploop_cow *cow;
> +
> +	if (bio_list_empty(&ploop->delta_cow_action_list))
> +		return;
> +	bio_list_merge(&cow_list, &ploop->delta_cow_action_list);
> +	bio_list_init(&ploop->delta_cow_action_list);
> +	spin_unlock_irq(&ploop->deferred_lock);
> +
> +	while ((cluster_bio = bio_list_pop(&cow_list)) != NULL) {
> +		cow = cluster_bio->bi_private;
> +		if (unlikely(cluster_bio->bi_status != BLK_STS_OK)) {
> +			complete_cow(cow, cluster_bio->bi_status);
> +			continue;
> +		}
> +
> +		if (cow->dst_cluster == BAT_ENTRY_NONE) {
> +			/*
> +			 * Stage #1: assign dst_cluster and write data
> +			 * to top delta.
> +			 */
> +			submit_cluster_write(cow);
> +		} else {
> +			/*
> +			 * Stage #2: data is written to top delta.
> +			 * Update index.
> +			 */
> +			submit_cow_index_wb(cow, piwb);
> +		}
> +	}
> +
> +	spin_lock_irq(&ploop->deferred_lock);
> +}
> +
> +void restart_delta_cow(struct ploop *ploop)
> +{
> +	struct bio_list cow_list = BIO_EMPTY_LIST;
> +	struct bio *cluster_bio;
> +	struct ploop_cow *cow;
> +
> +	spin_lock_irq(&ploop->deferred_lock);
> +	bio_list_merge(&cow_list, &ploop->delta_cow_action_list);
> +	bio_list_init(&ploop->delta_cow_action_list);
> +	spin_unlock_irq(&ploop->deferred_lock);
> +
> +	while ((cluster_bio = bio_list_pop(&cow_list)) != NULL) {
> +		cow = cluster_bio->bi_private;
> +		/* This may restart only normal cow */
> +		WARN_ON_ONCE(cow->end_fn != queue_or_fail);
> +		complete_cow(cow, BLK_STS_AGAIN);
> +	}
> +}
> +
> +/*
> + * This allocates a new cluster (if cluster wb is not pending yet),
> + * or tries to attach a bio to a planned page index wb.
> + *
> + * We want to update BAT indexes in batch, but we don't want to delay data
> + * bios submitting till the batch is assembled, submitted and completed.
> + * This function tries to submit data bios before indexes are written
> + * on disk.
> + * Original bio->bi_end_io mustn't be called before index wb is completed.
> + * We handle this in ploop_attach_end_action() by specific callback
> + * for ploop_data_bio_end().
> + * Note: cluster newer becomes locked here, since index update is called
> + * synchronously. Keep in mind this in case you make it async.
> + */
> +static bool locate_new_cluster_and_attach_bio(struct ploop *ploop,
> +					      struct ploop_index_wb *piwb,
> +					      unsigned int cluster,
> +					      unsigned int *dst_cluster,
> +					      struct bio *bio)
> +{
> +	bool bat_update_prepared = false;
> +	bool attached = false;
> +	unsigned int page_nr;
> +
> +	page_nr = bat_clu_to_page_nr(cluster);
> +
> +	if (piwb->page_nr == PAGE_NR_NONE) {
> +		/* No index wb in process. Prepare a new one */
> +		if (ploop_prepare_bat_update(ploop, page_nr, piwb) < 0) {
> +			bio->bi_status = BLK_STS_RESOURCE;
> +			goto error;
> +		}
> +		bat_update_prepared = true;
> +	}
> +
> +	if (piwb->page_nr != page_nr || piwb->type != PIWB_TYPE_ALLOC) {
> +		/* Another BAT page wb is in process */
> +		defer_bio(ploop, bio);
> +		goto out;
> +	}
> +
> +	if (ploop_alloc_cluster(ploop, piwb, cluster, dst_cluster)) {
> +		bio->bi_status = BLK_STS_IOERR;
> +		goto error;
> +	}
> +
> +	attached = ploop_attach_end_action(bio, piwb);
> +	if (!attached) {
> +		/*
> +		 * Could not prepare data bio to be submitted before index wb
> +		 * batch? Delay submitting. Good thing, that cluster allocation
> +		 * has already made, and it goes in the batch.
> +		 */
> +		defer_bio(ploop, bio);
> +	}
> +out:
> +	return attached;
> +error:
> +	/* Uninit piwb */
> +	if (bat_update_prepared)
> +		ploop_reset_bat_update(piwb);
> +	bio_endio(bio);
> +	return false;
> +}
> +
> +static int process_one_deferred_bio(struct ploop *ploop, struct bio *bio,
> +				    struct ploop_index_wb *piwb)
> +{
> +	unsigned int cluster, dst_cluster, level;
> +	sector_t sector = bio->bi_iter.bi_sector;
> +	bool ret;
> +
> +	/*
> +	 * Unlocked, since no one can update BAT in parallel:
> +	 * we update BAT only 1)from *this* kwork, and 2)from
> +	 * ploop_advance_local_after_bat_wb(), which we start
> +	 * and wait synchronously from *this* kwork.
> +	 */
> +	cluster = sector >> ploop->cluster_log;
> +	dst_cluster = ploop->bat_entries[cluster];
> +	level = ploop->bat_levels[cluster];
> +
> +	if (postpone_if_cluster_locked(ploop, bio, cluster))
> +		goto out;
> +	if (postpone_if_required_for_backup(ploop, bio, cluster))
> +		goto out;
> +
> +	if (op_is_discard(bio->bi_opf)) {
> +		handle_discard_bio(ploop, bio, cluster, dst_cluster);
> +		goto out;
> +	}
> +
> +	if (cluster_is_in_top_delta(ploop, cluster)) {
> +		/* Already mapped */
> +		goto queue;
> +	} else if (!op_is_write(bio->bi_opf)) {
> +		/*
> +		 * Simple read from secondary delta. May fail.
> +		 * (Also handles the case dst_cluster == BAT_ENTRY_NONE).
> +		 */
> +		initiate_delta_read(ploop, level, dst_cluster, bio);
> +		goto out;
> +	} else if (dst_cluster != BAT_ENTRY_NONE) {
> +		/*
> +		 * Read secondary delta and write to top delta. May fail.
> +		 * Yes, we can optimize the whole-cluster-write case and
> +		 * a lot of other corner cases, but we don't do that as
> +		 * snapshots are used and COW occurs very rare.
> +		 */
> +		initiate_cluster_cow(ploop, level, cluster, dst_cluster, bio);
> +		goto out;
> +	}
> +
> +	/* Cluster exists nowhere. Allocate it and setup bio as outrunning */
> +	ret = locate_new_cluster_and_attach_bio(ploop, piwb, cluster,
> +						&dst_cluster, bio);
> +	if (!ret)
> +		goto out;
> +queue:
> +	/* To improve: read lock may be avoided */
> +	read_lock_irq(&ploop->bat_rwlock);
> +	inc_nr_inflight(ploop, bio);
> +	read_unlock_irq(&ploop->bat_rwlock);
> +
> +	maybe_link_submitting_bio(ploop, bio, cluster);
> +
> +	remap_to_cluster(ploop, bio, dst_cluster);
> +	remap_to_origin(ploop, bio);
> +	generic_make_request(bio);
> +out:
> +	return 0;
> +}
> +
> +void ploop_submit_index_wb_sync(struct ploop *ploop,
> +				struct ploop_index_wb *piwb)
> +{
> +	struct block_device *bdev = ploop->origin_dev->bdev;
> +
> +	/* track_bio() will be called in ploop_bat_write_complete() */
> +	submit_bio(piwb->bat_bio);
> +	wait_for_completion(&piwb->comp);
> +
> +	if (!blk_queue_fua(bdev_get_queue(bdev))) {
> +		/*
> +		 * Error here does not mean that cluster write is failed,
> +		 * since ploop_map() could submit more bios in parallel.
> +		 * But it's not possible to differ them. Should we block
> +		 * ploop_map() during we do this?
> +		 */
> +		WARN_ON(blkdev_issue_flush(bdev, GFP_NOIO, NULL));
> +	}
> +}
> +
> +static void process_deferred_bios(struct ploop *ploop, struct bio_list *bios,
> +				  struct ploop_index_wb *piwb)
> +{
> +	struct bio *bio;
> +
> +	while ((bio = bio_list_pop(bios)))
> +		process_one_deferred_bio(ploop, bio, piwb);
> +}
> +
> +static int process_one_discard_bio(struct ploop *ploop, struct bio *bio,
> +				   struct ploop_index_wb *piwb)
> +{
> +	struct dm_ploop_endio_hook *h;
> +	unsigned int page_nr, cluster;
> +	bool bat_update_prepared;
> +	map_index_t *to;
> +
> +	WARN_ON(ploop->nr_deltas);
> +
> +	h = bio_to_endio_hook(bio);
> +	cluster = h->cluster;
> +	page_nr = bat_clu_to_page_nr(cluster);
> +	bat_update_prepared = false;
> +
> +	if (piwb->page_nr == PAGE_NR_NONE) {
> +		/* No index wb in process. Prepare a new one */
> +		if (ploop_prepare_bat_update(ploop, page_nr, piwb) < 0) {
> +			bio->bi_status = BLK_STS_RESOURCE;
> +			bio_endio(bio);
> +			goto out;
> +		}
> +		piwb->type = PIWB_TYPE_DISCARD;
> +		bat_update_prepared = true;
> +	}
> +
> +	if (piwb->page_nr != page_nr || piwb->type != PIWB_TYPE_DISCARD) {
> +		queue_discard_index_wb(ploop, bio);
> +		goto out;
> +	}
> +
> +	h->action = PLOOP_END_IO_DISCARD_INDEX_BIO;
> +
> +	/* Cluster index related to the page[page_nr] start */
> +	cluster -= piwb->page_nr * PAGE_SIZE / sizeof(map_index_t) - PLOOP_MAP_OFFSET;
> +
> +	to = kmap_atomic(piwb->bat_page);
> +	if (WARN_ON_ONCE(!to[cluster])) {
> +		bio_io_error(bio);
> +		if (bat_update_prepared)
> +			ploop_reset_bat_update(piwb);
> +	} else {
> +		to[cluster] = 0;
> +		bio_list_add(&piwb->ready_data_bios, bio);
> +	}
> +	kunmap_atomic(to);
> +out:
> +	return 0;
> +}
> +
> +static void do_discard_cleanup(struct ploop *ploop)
> +{
> +	unsigned long cleanup_jiffies;
> +
> +	if (ploop->force_link_inflight_bios &&
> +	    !atomic_read(&ploop->nr_discard_bios)) {
> +		/* Pairs with barrier in ploop_discard_index_bio_end() */
> +		smp_rmb();
> +		cleanup_jiffies = READ_ONCE(ploop->pending_discard_cleanup);
> +
> +		if (time_after(jiffies, cleanup_jiffies + CLEANUP_DELAY * HZ)) {
> +			ploop->force_link_inflight_bios = false;
> +			force_defer_bio_count_dec(ploop);
> +		}
> +	}
> +}
> +
> +/*
> + * This processes discard bios waiting index writeback after REQ_DISCARD
> + * to backing device has finished (PLOOP_END_IO_DISCARD_INDEX_BIO stage).
> + *
> + * Also this switches the device back in !force_link_inflight_bios mode
> + * after cleanup timeout has expired.
> + */
> +static void process_discard_bios(struct ploop *ploop, struct bio_list *bios,
> +				 struct ploop_index_wb *piwb)
> +{
> +	struct dm_ploop_endio_hook *h;
> +	struct bio *bio;
> +
> +	while ((bio = bio_list_pop(bios))) {
> +		h = bio_to_endio_hook(bio);
> +
> +		if (WARN_ON_ONCE(h->action != PLOOP_END_IO_DISCARD_BIO)) {
> +			bio_io_error(bio);
> +			continue;
> +		}
> +		process_one_discard_bio(ploop, bio, piwb);
> +	}
> +}
> +
> +void cancel_discard_bios(struct ploop *ploop)
> +{
> +	struct bio_list bio_list = BIO_EMPTY_LIST;
> +	struct bio *bio;
> +
> +	spin_lock_irq(&ploop->deferred_lock);
> +	bio_list_merge(&bio_list, &ploop->discard_bios);
> +	bio_list_init(&ploop->discard_bios);
> +	spin_unlock_irq(&ploop->deferred_lock);
> +
> +	while ((bio = bio_list_pop(&bio_list)) != NULL) {
> +		bio->bi_status = BLK_STS_NOTSUPP;
> +		bio_endio(bio);
> +	}
> +}
> +
> +/* Remove from tree bio and endio bio chain */
> +void unlink_postponed_backup_endio(struct ploop *ploop,
> +				   struct bio_list *bio_list,
> +				   struct dm_ploop_endio_hook *h)
> +{
> +	struct push_backup *pb = ploop->pb;
> +	struct bio *bio;
> +
> +	/* Remove from tree and queue attached bios */
> +	unlink_endio_hook(ploop, &pb->rb_root, h, bio_list);
> +
> +	/* Unlink from pb->pending */
> +	list_del(&h->list);
> +	/* Zero {list,piwb} union as it may be used later in further */
> +	memset(&h->list, 0, sizeof(h->list));
> +
> +	/* Queue relared bio itself */
> +	bio = dm_bio_from_per_bio_data(h, sizeof(*h));
> +	bio_list_add(bio_list, bio);
> +}
> +
> +void cleanup_backup(struct ploop *ploop)
> +{
> +	struct bio_list bio_list = BIO_EMPTY_LIST;
> +	struct push_backup *pb = ploop->pb;
> +	struct dm_ploop_endio_hook *h;
> +	struct rb_node *node;
> +
> +	spin_lock_irq(&ploop->pb_lock);
> +	/* Take bat_rwlock for visability in ploop_map() */
> +	write_lock(&ploop->bat_rwlock);
> +	pb->alive = false;
> +	write_unlock(&ploop->bat_rwlock);
> +
> +	while ((node = pb->rb_root.rb_node) != NULL) {
> +		h = rb_entry(node, struct dm_ploop_endio_hook, node);
> +		unlink_postponed_backup_endio(ploop, &bio_list, h);
> +	}
> +	spin_unlock_irq(&ploop->pb_lock);
> +
> +	if (!bio_list_empty(&bio_list))
> +		defer_bio_list(ploop, &bio_list);
> +
> +	del_timer_sync(&pb->deadline_timer);
> +}
> +
> +static void check_backup_deadline(struct ploop *ploop)
> +{
> +	u64 deadline, now = get_jiffies_64();
> +	struct push_backup *pb = ploop->pb;
> +
> +	if (likely(!pb || !pb->alive))
> +		return;
> +
> +	spin_lock_irq(&ploop->pb_lock);
> +	deadline = READ_ONCE(pb->deadline_jiffies);
> +	spin_unlock_irq(&ploop->pb_lock);
> +
> +	if (time_before64(now, deadline))
> +		return;
> +
> +	cleanup_backup(ploop);
> +}
> +
> +static void check_services_timeout(struct ploop *ploop)
> +{
> +	do_discard_cleanup(ploop);
> +	check_backup_deadline(ploop);
> +}
> +
> +void do_ploop_work(struct work_struct *ws)
> +{
> +	struct ploop *ploop = container_of(ws, struct ploop, worker);
> +	struct bio_list deferred_bios = BIO_EMPTY_LIST;
> +	struct bio_list discard_bios = BIO_EMPTY_LIST;
> +	struct ploop_index_wb piwb;
> +
> +	/*
> +	 * In piwb we collect inquires of indexes updates, which are
> +	 * related to the same page (of PAGE_SIZE), and then we submit
> +	 * all of them in batch in ploop_submit_index_wb_sync().
> +	 *
> +	 * Currenly, it's impossible to submit two bat pages update
> +	 * in parallel, since the update uses global ploop->bat_page.
> +	 * Note, that process_deferred_cmd() expects there is no
> +	 * pending index wb.
> +	 */
> +	ploop_index_wb_init(&piwb, ploop);
> +
> +	spin_lock_irq(&ploop->deferred_lock);
> +	process_deferred_cmd(ploop, &piwb);
> +	process_delta_wb(ploop, &piwb);
> +
> +	bio_list_merge(&deferred_bios, &ploop->deferred_bios);
> +	bio_list_merge(&discard_bios, &ploop->discard_bios);
> +	bio_list_init(&ploop->deferred_bios);
> +	bio_list_init(&ploop->discard_bios);
> +	spin_unlock_irq(&ploop->deferred_lock);
> +
> +	process_deferred_bios(ploop, &deferred_bios, &piwb);
> +	process_discard_bios(ploop, &discard_bios, &piwb);
> +
> +	if (piwb.page_nr != PAGE_NR_NONE) {
> +		/* Index wb was prepared -- submit and wait it */
> +		ploop_submit_index_wb_sync(ploop, &piwb);
> +		ploop_reset_bat_update(&piwb);
> +	}
> +
> +	check_services_timeout(ploop);
> +}
> +
> +static bool should_defer_bio(struct ploop *ploop, struct bio *bio,
> +			     unsigned int cluster)
> +{
> +	struct push_backup *pb = ploop->pb;
> +
> +	lockdep_assert_held(&ploop->bat_rwlock);
> +
> +	if (ploop->force_defer_bio_count)
> +		return true;
> +	if (pb && pb->alive && op_is_write(bio->bi_opf))
> +		return test_bit(cluster, pb->ppb_map);
> +	return false;
> +}
> +
> +/*
> + * ploop_map() tries to map bio to origins or delays it.
> + * It never modifies ploop->bat_entries and other cached
> + * metadata: this should be made in do_ploop_work() only.
> + */
> +int ploop_map(struct dm_target *ti, struct bio *bio)
> +{
> +	struct ploop *ploop = ti->private;
> +	unsigned int cluster, dst_cluster;
> +	unsigned long flags;
> +	bool in_top_delta;
> +
> +	ploop_init_end_io(ploop, bio);
> +
> +	if (bio_sectors(bio)) {
> +		if (op_is_discard(bio->bi_opf))
> +			return ploop_map_discard(ploop, bio);
> +		if (ploop_bio_cluster(ploop, bio, &cluster) < 0)
> +			return DM_MAPIO_KILL;
> +
> +		/* map it */
> +		read_lock_irqsave(&ploop->bat_rwlock, flags);
> +		dst_cluster = ploop->bat_entries[cluster];
> +		in_top_delta = cluster_is_in_top_delta(ploop, cluster);
> +		if (unlikely(should_defer_bio(ploop, bio, cluster))) {
> +			/* defer all bios */
> +			in_top_delta = false;
> +			dst_cluster = 0;
> +		}
> +		if (in_top_delta)
> +			inc_nr_inflight(ploop, bio);
> +		read_unlock_irqrestore(&ploop->bat_rwlock, flags);
> +
> +		if (!in_top_delta) {
> +			if (op_is_write(bio->bi_opf) || dst_cluster != BAT_ENTRY_NONE) {
> +				defer_bio(ploop, bio);
> +			} else {
> +				zero_fill_bio(bio);
> +				bio_endio(bio);
> +			}
> +
> +			return DM_MAPIO_SUBMITTED;
> +		}
> +
> +		remap_to_cluster(ploop, bio, dst_cluster);
> +	}
> +
> +	remap_to_origin(ploop, bio);
> +
> +	return DM_MAPIO_REMAPPED;
> +}
> +
> +int ploop_endio(struct dm_target *ti, struct bio *bio, blk_status_t *err)
> +{
> +	struct dm_ploop_endio_hook *h = bio_to_endio_hook(bio);
> +	struct ploop *ploop = ti->private;
> +	int ret = DM_ENDIO_DONE;
> +
> +	if (h->ref_index != PLOOP_REF_INDEX_INVALID) {
> +		/*
> +		 * This function may be called twice for discard
> +		 * and for data bios. Check for ref_index to not
> +		 * track @bio twice.
> +		 */
> +		track_bio(ploop, bio);
> +	}
> +	/*
> +	 * This function is called from the very beginning
> +	 * of bio->bi_end_io (which is dm.c::clone_endio()).
> +	 *
> +	 * DM_ENDIO_DONE return value means handling goes OK.
> +	 * DM_ENDIO_INCOMPLETE tells the caller to stop end io
> +	 * processing, and that we are going to call bi_end_io
> +	 * directly later again. This function (ploop_endio)
> +	 * also will be called again then!
> +	 * See dm.c::clone_endio() for the details.
> +	 */
> +	if (h->action == PLOOP_END_IO_DATA_BIO)
> +		ret = ploop_data_bio_end(bio);
> +
> +	if (h->action == PLOOP_END_IO_DISCARD_BIO)
> +		ret = ploop_discard_bio_end(ploop, bio);
> +
> +	if (h->action == PLOOP_END_IO_DISCARD_INDEX_BIO)
> +		ret = ploop_discard_index_bio_end(ploop, bio);
> +
> +	if (ret == DM_ENDIO_DONE) {
> +		maybe_unlink_completed_bio(ploop, bio);
> +		dec_nr_inflight(ploop, bio);
> +	}
> +
> +	return ret;
> +}
> +
> +/*
> + * Prepare simple index writeback without attached data bios.
> + * In case of @dst_cluster is passed, this tryes to allocate
> + * another index instead of existing. If so, management of
> + * old bat_entries[@cluster] and of related holes_bitmap bit
> + * is caller duty.
> + */
> +int ploop_prepare_reloc_index_wb(struct ploop *ploop,
> +				 struct ploop_index_wb *piwb,
> +				 unsigned int cluster,
> +				 unsigned int *dst_cluster)
> +{
> +	unsigned int page_nr = bat_clu_to_page_nr(cluster);
> +
> +	if (piwb->page_nr != PAGE_NR_NONE ||
> +	    ploop_prepare_bat_update(ploop, page_nr, piwb))
> +		goto out_eio;
> +	if (dst_cluster) {
> +		/*
> +		 * For ploop_advance_local_after_bat_wb(): do not concern
> +		 * about bat_cluster[@cluster] is set. Zero bat_page[@cluster],
> +		 * to make ploop_alloc_cluster() allocate new dst_cluster from
> +		 * holes_bitmap.
> +		 */
> +		piwb->type = PIWB_TYPE_RELOC;
> +		ploop_bat_page_zero_cluster(ploop, piwb, cluster);
> +		if (ploop_alloc_cluster(ploop, piwb, cluster, dst_cluster))
> +			goto out_reset;
> +	}
> +
> +	return 0;
> +
> +out_reset:
> +	ploop_reset_bat_update(piwb);
> +out_eio:
> +	return -EIO;
> +}
> diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
> new file mode 100644
> index 000000000000..69c215e1bc66
> --- /dev/null
> +++ b/drivers/md/dm-ploop-target.c
> @@ -0,0 +1,318 @@
> +// SPDX-License-Identifier: GPL-2.0-only
> +#include "dm.h"
> +#include <linux/buffer_head.h>
> +#include <linux/dm-io.h>
> +#include <linux/dm-kcopyd.h>
> +#include <linux/init.h>
> +#include <linux/module.h>
> +#include <linux/file.h>
> +#include <linux/slab.h>
> +#include <linux/vmalloc.h>
> +#include "dm-ploop.h"
> +
> +#define DM_MSG_PREFIX "ploop"
> +
> +static void inflight_bios_ref_exit0(struct percpu_ref *ref)
> +{
> +	struct ploop *ploop = container_of(ref, struct ploop,
> +					   inflight_bios_ref[0]);
> +	complete(&ploop->inflight_bios_ref_comp);
> +}
> +
> +static void inflight_bios_ref_exit1(struct percpu_ref *ref)
> +{
> +	struct ploop *ploop = container_of(ref, struct ploop,
> +					   inflight_bios_ref[1]);
> +	complete(&ploop->inflight_bios_ref_comp);
> +}
> +
> +static void ploop_flush_workqueue(struct ploop *ploop)
> +{
> +	bool again = true;
> +
> +	while (again) {
> +		flush_workqueue(ploop->wq);
> +
> +		spin_lock_irq(&ploop->deferred_lock);
> +		again = ploop->deferred_cmd || !bio_list_empty(&ploop->deferred_bios);
> +		spin_unlock_irq(&ploop->deferred_lock);
> +		if (again)
> +			schedule_timeout(HZ);
> +	}
> +}
> +
> +
> +static void ploop_destroy(struct ploop *ploop)
> +{
> +	int i;
> +
> +	if (ploop->pb)
> +		ploop_free_pb(ploop->pb);
> +	if (ploop->wq) {
> +		ploop_flush_workqueue(ploop);
> +		destroy_workqueue(ploop->wq);
> +	}
> +	if (ploop->origin_dev) {
> +		WARN_ON(blkdev_issue_flush(ploop->origin_dev->bdev, GFP_NOIO, NULL));
> +		dm_put_device(ploop->ti, ploop->origin_dev);
> +	}
> +
> +	for (i = 0; i < 2; i++)
> +		percpu_ref_exit(&ploop->inflight_bios_ref[i]);
> +	/* Nobody uses it after destroy_workqueue() */
> +	while (ploop->nr_deltas-- > 0)
> +		fput(ploop->deltas[ploop->nr_deltas]);
> +	WARN_ON(!RB_EMPTY_ROOT(&ploop->exclusive_bios_rbtree));
> +	WARN_ON(!RB_EMPTY_ROOT(&ploop->inflight_bios_rbtree));
> +	kfree(ploop->deltas);
> +	kvfree(ploop->bat_levels);
> +	kvfree(ploop->holes_bitmap);
> +	kvfree(ploop->tracking_bitmap);
> +	vfree(ploop->hdr);
> +	kfree(ploop);
> +}
> +
> +static int ploop_check_origin_dev(struct dm_target *ti, struct ploop *ploop)
> +{
> +	struct block_device *bdev = ploop->origin_dev->bdev;
> +	int r;
> +
> +	if (bdev->bd_block_size < PAGE_SIZE) {
> +		ti->error = "Origin dev has too small block size";
> +		return -EINVAL;
> +	}
> +
> +	r = ploop_read_metadata(ti, ploop);
> +	if (r) {
> +		ti->error = "Can't read ploop header";
> +		return r;
> +	}
> +
> +	return 0;
> +}
> +
> +/*
> + * <data dev>
> + */
> +static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
> +{
> +	percpu_ref_func_t *release;
> +	struct ploop *ploop;
> +	int i, ret;
> +
> +	if (argc < 1)
> +		return -EINVAL;
> +
> +	ploop = kzalloc(sizeof(*ploop), GFP_KERNEL);
> +	if (!ploop) {
> +		ti->error = "Error allocating ploop structure";
> +		return -ENOMEM;
> +	}
> +
> +	rwlock_init(&ploop->bat_rwlock);
> +	mutex_init(&ploop->ctl_mutex);
> +	spin_lock_init(&ploop->deferred_lock);
> +	spin_lock_init(&ploop->pb_lock);
> +
> +	bio_list_init(&ploop->deferred_bios);
> +	bio_list_init(&ploop->discard_bios);
> +	INIT_LIST_HEAD(&ploop->cluster_lk_list);
> +	bio_list_init(&ploop->delta_cow_action_list);
> +	atomic_set(&ploop->nr_discard_bios, 0);
> +
> +	INIT_WORK(&ploop->worker, do_ploop_work);
> +
> +	for (i = 0; i < 2; i++) {
> +		release = i ? inflight_bios_ref_exit1 : inflight_bios_ref_exit0;
> +		if (percpu_ref_init(&ploop->inflight_bios_ref[i], release,
> +				    0, GFP_KERNEL)) {
> +			ret = -ENOMEM;
> +			ti->error = "could not alloc percpu_ref";
> +			goto err;
> +		}
> +	}
> +
> +	ti->private = ploop;
> +	ploop->ti = ti;
> +
> +	if (kstrtou32(argv[0], 10, &ploop->cluster_log) < 0) {
> +		ret = -EINVAL;
> +		ti->error = "could not parse cluster_log";
> +		goto err;
> +	}
> +
> +	/*
> +	 * We do not add FMODE_EXCL, because further open_table_device()
> +	 * unconditionally adds it. See call stack.
> +	 */
> +	ret = dm_get_device(ti, argv[1], dm_table_get_mode(ti->table),
> +			    &ploop->origin_dev);
> +	if (ret) {
> +		ti->error = "Error opening origin device";
> +		goto err;
> +	}
> +
> +	ret = ploop_check_origin_dev(ti, ploop);
> +	if (ret) {
> +		/* ploop_check_origin_dev() assigns ti->error */
> +		goto err;
> +	}
> +
> +	ret = dm_set_target_max_io_len(ti, 1 << ploop->cluster_log);
> +	if (ret) {
> +		ti->error = "could not set max_io_len";
> +		goto err;
> +	}
> +
> +	ret = -ENOMEM;
> +
> +	ploop->wq = alloc_ordered_workqueue("dm-" DM_MSG_PREFIX, WQ_MEM_RECLAIM);
> +	if (!ploop->wq) {
> +		ti->error = "could not create workqueue for metadata object";
> +		goto err;
> +	}
> +
> +	ploop->exclusive_bios_rbtree = RB_ROOT;
> +	ploop->inflight_bios_rbtree = RB_ROOT;
> +	ret = -EINVAL;
> +	for (i = 2; i < argc; i++) {
> +                ret = ploop_add_delta(ploop, argv[i]);
> +		if (ret < 0)
> +			goto err;
> +	}
> +
> +	ti->per_io_data_size = sizeof(struct dm_ploop_endio_hook);
> +	ti->num_flush_bios = 1;
> +	ti->flush_supported = true;
> +	ti->num_discard_bios = 1;
> +	ti->discards_supported = true;
> +	return 0;
> +
> +err:
> +	ploop_destroy(ploop);
> +	return ret;
> +}
> +
> +static void ploop_dtr(struct dm_target *ti)
> +{
> +	ploop_destroy(ti->private);
> +}
> +
> +static void ploop_io_hints(struct dm_target *ti, struct queue_limits *limits)
> +{
> +	struct ploop *ploop = ti->private;
> +	unsigned int cluster_log = ploop->cluster_log;
> +
> +	/* TODO: take into account the origin_dev */
> +	limits->max_discard_sectors = 1 << cluster_log;
> +	limits->max_hw_discard_sectors = 1 << cluster_log;
> +	limits->discard_granularity = 1 << (cluster_log + SECTOR_SHIFT);
> +	limits->discard_alignment = 0;
> +	limits->discard_misaligned = 0;
> +}
> +
> +static sector_t get_dev_size(struct dm_dev *dev)
> +{
> +	return i_size_read(dev->bdev->bd_inode) >> SECTOR_SHIFT;
> +}
> +
> +static int ploop_iterate_devices(struct dm_target *ti,
> +				 iterate_devices_callout_fn fn, void *data)
> +{
> +	struct ploop *ploop = ti->private;
> +	sector_t size;
> +
> +	size = get_dev_size(ploop->origin_dev);
> +
> +	return fn(ti, ploop->origin_dev, 0, size, data);
> +}
> +
> +static void ploop_postsuspend(struct dm_target *ti)
> +{
> +	struct ploop *ploop = ti->private;
> +
> +	ploop_flush_workqueue(ploop);
> +
> +	blkdev_issue_flush(ploop->origin_dev->bdev, GFP_NOIO, NULL);
> +}
> +
> +static void ploop_status(struct dm_target *ti, status_type_t type,
> +			 unsigned int status_flags, char *result,
> +			 unsigned int maxlen)
> +{
> +	struct ploop *ploop = ti->private;
> +	char stat[16] = { 0 }, *p = stat;
> +	ssize_t sz = 0;
> +
> +	read_lock_irq(&ploop->bat_rwlock);
> +	if (ploop->tracking_bitmap)
> +		p += sprintf(p, "t");
> +	if (ploop->noresume)
> +		p += sprintf(p, "n");
> +	if (ploop->pb) {
> +		if (ploop->pb->alive)
> +			p += sprintf(p, "b");
> +		else
> +			p += sprintf(p, "B");
> +	}
> +	if (p == stat)
> +		p += sprintf(p, "o");
> +	BUG_ON(p - stat >= sizeof(stat));
> +	DMEMIT("%s %u v2 %u %s", ploop->origin_dev->name, ploop->nr_deltas,
> +		1 << ploop->cluster_log, stat);
> +	read_unlock_irq(&ploop->bat_rwlock);
> +}
> +
> +static int ploop_preresume(struct dm_target *ti)
> +{
> +	struct ploop *ploop = ti->private;
> +	int ret = 0;
> +
> +	mutex_lock(&ploop->ctl_mutex);
> +	if (ploop->noresume)
> +		ret = -EAGAIN;
> +	mutex_unlock(&ploop->ctl_mutex);
> +	return ret;
> +}
> +
> +/*----------------------------------------------------------------*/
> +
> +static struct target_type ploop_target = {
> +	.name = "ploop",
> +	.version = {1, 0, 0},
> +	.module = THIS_MODULE,
> +	.ctr = ploop_ctr,
> +	.dtr = ploop_dtr,
> +	.map = ploop_map,
> +	.end_io = ploop_endio,
> +	.message = ploop_message,
> +	.io_hints = ploop_io_hints,
> +	.iterate_devices = ploop_iterate_devices,
> +	.postsuspend = ploop_postsuspend,
> +	.preresume = ploop_preresume,
> +	.status = ploop_status,
> +};
> +
> +static int __init dm_ploop_init(void)
> +{
> +	int r;
> +
> +	r = dm_register_target(&ploop_target);
> +	if (r) {
> +		DMERR("ploop target registration failed: %d", r);
> +		return r;
> +	}
> +
> +	return 0;
> +}
> +
> +static void __exit dm_ploop_exit(void)
> +{
> +	dm_unregister_target(&ploop_target);
> +}
> +
> +module_init(dm_ploop_init);
> +module_exit(dm_ploop_exit);
> +
> +MODULE_AUTHOR("Kirill Tkhai <ktkhai at virtuozzo.com>");
> +MODULE_LICENSE("GPL");
> diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
> new file mode 100644
> index 000000000000..ea6917d13835
> --- /dev/null
> +++ b/drivers/md/dm-ploop.h
> @@ -0,0 +1,388 @@
> +#ifndef __DM_PLOOP_H
> +#define __DM_PLOOP_H
> +
> +#include <linux/device-mapper.h>
> +#include <linux/bio.h>
> +
> +#define PLOOP_MAP_OFFSET 16
> +typedef u32 map_index_t;
> +
> +#define SIGNATURE_DISK_IN_USE           0x746F6E59
> +
> +#pragma pack(push, 1)
> +struct ploop_pvd_header {
> +	__u8  m_Sig[16];	/* Signature */
> +	__u32 m_Type;		/* Disk type */
> +	__u32 m_Heads;		/* heads count */
> +	__u32 m_Cylinders;	/* tracks count */
> +	__u32 m_Sectors;	/* Sectors per track count */
> +	__u32 m_Size;		/* Size of disk in tracks */
> +	union {			/* Size of disk in 512-byte sectors */
> +		struct {
> +			__u32 m_SizeInSectors_v1;
> +			__u32 Unused;
> +		};
> +		__u64 m_SizeInSectors_v2;
> +	};
> +	__u32 m_DiskInUse;	/* Disk in use */
> +	__u32 m_FirstBlockOffset; /* First data block offset (in sectors) */
> +	__u32 m_Flags;		/* Misc flags */
> +	__u8  m_Reserved[8];	/* Reserved */
> +};
> +#pragma pack(pop)
> +
> +struct ploop_cmd {
> +#define PLOOP_CMD_RESIZE		1
> +#define PLOOP_CMD_ADD_DELTA		2
> +#define PLOOP_CMD_MERGE_SNAPSHOT	3
> +#define PLOOP_CMD_NOTIFY_DELTA_MERGED	4
> +#define PLOOP_CMD_SWITCH_TOP_DELTA	5
> +#define PLOOP_CMD_UPDATE_DELTA_INDEX	6
> +#define PLOOP_CMD_TRACKING_START	7
> +#define PLOOP_CMD_FLIP_UPPER_DELTAS	8
> +#define PLOOP_CMD_SET_PUSH_BACKUP	9
> +	struct completion comp;
> +	struct ploop *ploop;
> +	unsigned int type;
> +	int retval;
> +	union {
> +		struct {
> +			u64 new_size;
> +			/* Preallocated data */
> +			void *hdr; /* hdr and bat_entries consequentially */
> +			void *bat_levels;
> +			void *holes_bitmap;
> +#define PLOOP_GROW_STAGE_INITIAL	0
> +			unsigned int stage;
> +			unsigned int nr_bat_entries;
> +			unsigned int hb_nr;
> +			unsigned int end_dst_cluster;
> +			unsigned int nr_old_bat_clu;
> +			unsigned int cluster, dst_cluster;
> +			struct bio *bio;
> +		} resize;
> +		struct {
> +			struct file *file;
> +			struct file **deltas;
> +			void *hdr; /* hdr and bat_entries consequentially */
> +			unsigned int raw_clusters;
> +			bool is_raw;
> +		} add_delta;
> +		struct {
> +#define NR_MERGE_BIOS			64
> +			atomic_t nr_available;
> +			unsigned int cluster; /* Currently iterated cluster */
> +			bool do_repeat;
> +		} merge;
> +		struct {
> +			void *hdr; /* hdr and bat_entries consequentially */
> +			u8 level;
> +			bool forward;
> +		} notify_delta_merged;
> +		struct {
> +			struct dm_dev *origin_dev;
> +			struct file **deltas;
> +		} switch_top_delta;
> +		struct {
> +			u8 level;
> +			const char *map;
> +		} update_delta_index;
> +		struct {
> +			void *tracking_bitmap;
> +			unsigned int tb_nr;
> +		} tracking_start;
> +		struct {
> +			struct dm_dev *origin_dev;
> +			struct file *file;
> +		} flip_upper_deltas;
> +		struct {
> +			struct push_backup *pb;
> +		} set_push_backup;
> +	};
> +};
> +
> +#define PAGE_NR_NONE		UINT_MAX
> +/* We can't use 0 for unmapped clusters, since RAW image references 0 cluster */
> +#define BAT_ENTRY_NONE		UINT_MAX
> +
> +#define BAT_LEVEL_TOP		U8_MAX
> +#define CLEANUP_DELAY		20
> +#define BACKUP_DEADLINE		42
> +
> +#define PLOOP_BIOS_HTABLE_BITS	8
> +#define PLOOP_BIOS_HTABLE_SIZE	(1 << PLOOP_BIOS_HTABLE_BITS)
> +
> +enum piwb_type {
> +	PIWB_TYPE_ALLOC = 0,	/* Allocation of new clusters */
> +	PIWB_TYPE_RELOC,	/* Relocation of cluster (on BAT grow) */
> +	PIWB_TYPE_DISCARD,	/* Zeroing index on discard */
> +};
> +
> +struct ploop_index_wb {
> +	struct ploop *ploop;
> +	struct completion comp;
> +	enum piwb_type type;
> +	spinlock_t lock;
> +	struct page *bat_page;
> +	struct bio *bat_bio;
> +	struct bio_list ready_data_bios;
> +	struct bio_list cow_list;
> +	atomic_t count;
> +	bool completed;
> +	int bi_status;
> +	unsigned int page_nr;
> +};
> +
> +struct push_backup {
> +	struct ploop *ploop;
> +	u8 uuid[17];
> +	bool alive;
> +
> +	void *ppb_map;
> +
> +	u64 deadline_jiffies;
> +	struct timer_list deadline_timer;
> +
> +	/* This tree is for looking for delayed bio by cluster */
> +	struct rb_root rb_root;
> +
> +	struct list_head pending;
> +};
> +
> +struct ploop {
> +	struct dm_target *ti;
> +
> +	struct dm_dev *origin_dev;
> +	struct ploop_pvd_header *hdr;
> +	unsigned int *bat_entries;
> +	u8 *bat_levels;
> +	struct file **deltas;
> +	u8 nr_deltas;
> +	unsigned int nr_bat_entries;
> +	unsigned int cluster_log; /* In sectors */
> +	/*
> +	 * Absolute values from start of file. BAT-related clusters
> +	 * are also included, and their bits must be zeroed.
> +	 */
> +	void *holes_bitmap; /* Clearing a bit occurs from kwork only */
> +	unsigned int hb_nr; /* holes_bitmap size in bits */
> +	rwlock_t bat_rwlock;
> +
> +	void *tracking_bitmap;
> +	unsigned int tb_nr; /* tracking_bitmap size in bits */
> +	unsigned int tb_cursor;
> +
> +	int force_defer_bio_count; /* Protected by bat_rwlock */
> +	bool force_link_inflight_bios;
> +	/*
> +	 * Hash table to link non-exclusive submitted bios.
> +	 * This is needed for discard to check, nobody uses
> +	 * the discarding cluster. Only made when the above
> +	 * force_link_inflight_bios is enabled.
> +	 */
> +	struct rb_root inflight_bios_rbtree;
> +	/*
> +	 * Hash table to link exclusive submitted bios.
> +	 * This allows to delay bios going in some cluster.
> +	 */
> +	struct rb_root exclusive_bios_rbtree;
> +
> +	atomic_t nr_discard_bios;
> +	unsigned long pending_discard_cleanup;
> +
> +	struct workqueue_struct *wq;
> +	struct work_struct worker;
> +
> +	struct completion inflight_bios_ref_comp;
> +	struct percpu_ref inflight_bios_ref[2];
> +	unsigned int inflight_bios_ref_index:1;
> +
> +	spinlock_t deferred_lock;
> +	struct bio_list deferred_bios;
> +	struct bio_list discard_bios;
> +
> +	struct mutex ctl_mutex;
> +	struct ploop_cmd *deferred_cmd;
> +
> +	/*
> +	 * List of locked clusters (no write is possible).
> +	 * Make @cluster_lk_list hash table or smth like this.
> +	 */
> +	struct list_head cluster_lk_list;
> +
> +	/* List of COW requests requiring action. */
> +	struct bio_list delta_cow_action_list;
> +
> +	/* Resume is prohibited */
> +	bool noresume;
> +
> +	/* Maintaince in process */
> +	bool maintaince;
> +
> +	/* Push Backup */
> +	struct push_backup *pb;
> +	spinlock_t pb_lock;
> +};
> +
> +struct dm_ploop_endio_hook {
> +	union {
> +		struct ploop_index_wb *piwb;
> +		struct list_head list;
> +	};
> +	struct rb_node node;
> +	/* List of bios, which will be queued from this bio end */
> +	struct bio *endio_bio_list;
> +
> +	unsigned int cluster;
> +
> +#define PLOOP_END_IO_NONE		0
> +#define PLOOP_END_IO_DATA_BIO		1
> +#define PLOOP_END_IO_DISCARD_BIO	2
> +#define PLOOP_END_IO_DISCARD_INDEX_BIO	3
> +	unsigned int action:2;
> +	/*
> +	 * 0 and 1 are related to inflight_bios_ref[],
> +	 * 2 means index is not assigned.
> +	 */
> +#define PLOOP_REF_INDEX_INVALID	2
> +	unsigned int ref_index:2;
> +};
> +
> +static inline bool ploop_is_ro(struct ploop *ploop)
> +{
> +	return (dm_table_get_mode(ploop->ti->table) & FMODE_WRITE) == 0;
> +}
> +
> +static inline void remap_to_origin(struct ploop *ploop, struct bio *bio)
> +{
> +	bio_set_dev(bio, ploop->origin_dev->bdev);
> +}
> +
> +static inline void remap_to_cluster(struct ploop *ploop, struct bio *bio,
> +				    unsigned int cluster)
> +{
> +	bio->bi_iter.bi_sector &= ((1 << ploop->cluster_log) - 1);
> +	bio->bi_iter.bi_sector |= (cluster << ploop->cluster_log);
> +}
> +
> +static inline bool whole_cluster(struct ploop *ploop, struct bio *bio)
> +{
> +	if (bio_sectors(bio) != (1 << ploop->cluster_log))
> +		return false;
> +	/*
> +	 * There is no sacral meaning in bio_end_sector(),
> +	 * it's just a suitable and existing primitive.
> +	 */
> +	return !(bio_end_sector(bio) & ((1 << ploop->cluster_log) - 1));
> +}
> +
> +static inline void ploop_hole_set_bit(unsigned long nr, struct ploop *ploop)
> +{
> +	if (!WARN_ON_ONCE(nr >= ploop->hb_nr))
> +		set_bit(nr, ploop->holes_bitmap);
> +}
> +
> +static inline void ploop_hole_clear_bit(unsigned int nr, struct ploop *ploop)
> +{
> +	if (!WARN_ON_ONCE(nr >= ploop->hb_nr))
> +		clear_bit(nr, ploop->holes_bitmap);
> +}
> +
> +static inline unsigned int nr_pages_in_cluster(struct ploop *ploop)
> +{
> +	return 1 << (ploop->cluster_log + 9 - PAGE_SHIFT);
> +}
> +
> +/* Get number of clusters, occupied by hdr and BAT */
> +static inline unsigned int ploop_nr_bat_clusters(struct ploop *ploop,
> +						 unsigned int nr_bat_entries)
> +{
> +	unsigned long size, bat_clusters;
> +
> +	size = (PLOOP_MAP_OFFSET + nr_bat_entries) * sizeof(map_index_t);
> +	bat_clusters = DIV_ROUND_UP(size, 1 << (ploop->cluster_log + 9));
> +
> +	return bat_clusters;
> +}
> +
> +static inline bool cluster_is_in_top_delta(struct ploop *ploop,
> +					   unsigned int cluster)
> +{
> +	if (WARN_ON(cluster >= ploop->nr_bat_entries))
> +		return false;
> +	if (ploop->bat_entries[cluster] == BAT_ENTRY_NONE ||
> +	    ploop->bat_levels[cluster] < BAT_LEVEL_TOP)
> +		return false;
> +	return true;
> +}
> +
> +static inline void force_defer_bio_count_inc(struct ploop *ploop)
> +{
> +	unsigned long flags;
> +
> +	write_lock_irqsave(&ploop->bat_rwlock, flags);
> +	WARN_ON_ONCE(ploop->force_defer_bio_count++ < 0);
> +	write_unlock_irqrestore(&ploop->bat_rwlock, flags);
> +}
> +
> +static inline void force_defer_bio_count_dec(struct ploop *ploop)
> +{
> +	unsigned long flags;
> +
> +	write_lock_irqsave(&ploop->bat_rwlock, flags);
> +	WARN_ON_ONCE(--ploop->force_defer_bio_count < 0);
> +	write_unlock_irqrestore(&ploop->bat_rwlock, flags);
> +}
> +
> +extern void __track_bio(struct ploop *ploop, struct bio *bio);
> +
> +static inline void track_bio(struct ploop *ploop, struct bio *bio)
> +{
> +	/* See comment in process_tracking_start() about visibility */
> +	if (unlikely(ploop->tracking_bitmap))
> +		__track_bio(ploop, bio);
> +}
> +
> +extern int ploop_add_delta(struct ploop *ploop, const char *arg);
> +extern void defer_bio(struct ploop *ploop, struct bio *bio);
> +extern void defer_bio_list(struct ploop *ploop, struct bio_list *bio_list);
> +extern void do_ploop_work(struct work_struct *ws);
> +extern void process_deferred_cmd(struct ploop *ploop,
> +			struct ploop_index_wb *piwb);
> +extern int ploop_map(struct dm_target *ti, struct bio *bio);
> +extern int ploop_endio(struct dm_target *ti, struct bio *bio, blk_status_t *err);
> +extern void ploop_inflight_bios_ref_switch(struct ploop *ploop);
> +extern struct dm_ploop_endio_hook *find_lk_of_cluster(struct ploop *ploop,
> +						      unsigned int cluster);
> +extern struct dm_ploop_endio_hook *find_endio_hook(struct ploop *ploop,
> +						   struct rb_root *root,
> +						   unsigned int cluster);
> +extern void unlink_postponed_backup_endio(struct ploop *ploop,
> +					  struct bio_list *bio_list,
> +					  struct dm_ploop_endio_hook *h);
> +
> +extern int ploop_prepare_reloc_index_wb(struct ploop *, struct ploop_index_wb *,
> +					unsigned int, unsigned int *);
> +extern void ploop_reset_bat_update(struct ploop_index_wb *);
> +extern void ploop_submit_index_wb_sync(struct ploop *, struct ploop_index_wb *);
> +extern int ploop_message(struct dm_target *ti, unsigned int argc, char **argv,
> +			 char *result, unsigned int maxlen);
> +extern int submit_cluster_cow(struct ploop *ploop, unsigned int level,
> +			      unsigned int cluster, unsigned int dst_cluster,
> +			      void (*end_fn)(struct ploop *, int, void *), void *data);
> +extern void restart_delta_cow(struct ploop *ploop);
> +extern void cancel_discard_bios(struct ploop *ploop);
> +
> +extern struct bio * alloc_bio_with_pages(struct ploop *ploop);
> +extern void free_bio_with_pages(struct ploop *ploop, struct bio *bio);
> +extern void bio_prepare_offsets(struct ploop *, struct bio *, unsigned int);
> +extern void ploop_free_pb(struct push_backup *pb);
> +extern void cleanup_backup(struct ploop *ploop);
> +
> +extern int ploop_read_cluster_sync(struct ploop *, struct bio *, unsigned int);
> +
> +extern int ploop_read_metadata(struct dm_target *ti, struct ploop *ploop);
> +extern int ploop_read_delta_metadata(struct ploop *ploop, struct file *file,
> +				     void **d_hdr);
> +
> +#endif /* __DM_PLOOP_H */
>
>
> .
>



More information about the Devel mailing list