[Devel] [PATCH rh7 4/4] ploop: fix free_list starvation

Maxim Patlasov mpatlasov at virtuozzo.com
Mon Jul 11 22:00:21 PDT 2016


Under high load, and when push_backup is in progress, it is
possible that all preq-s from free_list will be consumed by
either incoming bio-s waiting for backup tool out-of-band
processing, or some incoming bio-s blocked on the former ones.

Then, ploop reaches maximum possible preq->active_reqs and
goes to sleep waiting for something. But this something is
actually the backup tool who is blocked on reading from
the ploop device. Deadlock.

The patch fixes the problem by queueing incoming WRITE bio-s
(which otherwise would be blocked on backup tool out-of-band
processing anyway) to a separate queue:
plo->pbd->bio_pending_list. Thus, we always have some free
preq-s for processing incoming READ bio-o from backup tool.

https://jira.sw.ru/browse/PSBM-49454

Signed-off-by: Maxim Patlasov <mpatlasov at virtuozzo.com>
---
 drivers/block/ploop/dev.c         |   67 +++++++++++++++++++++++++++++--------
 drivers/block/ploop/push_backup.c |   52 ++++++++++++++++++++++++++++-
 drivers/block/ploop/push_backup.h |    5 +++
 drivers/block/ploop/sysfs.c       |    6 +++
 include/linux/ploop/ploop.h       |    1 +
 5 files changed, 116 insertions(+), 15 deletions(-)

diff --git a/drivers/block/ploop/dev.c b/drivers/block/ploop/dev.c
index 6795b95..6d449b7 100644
--- a/drivers/block/ploop/dev.c
+++ b/drivers/block/ploop/dev.c
@@ -753,20 +753,43 @@ preallocate_bio(struct bio * orig_bio, struct ploop_device * plo)
 	return nbio;
 }
 
-static void process_bio_queue(struct ploop_device * plo,
-			      struct list_head *drop_list,
-			      int account_blockable)
+static void process_bio_queue_one(struct ploop_device * plo,
+				  struct list_head *drop_list,
+				  int check_push_backup)
+{
+	struct bio *bio = plo->bio_head;
+
+	BUG_ON (!plo->bio_tail);
+	plo->bio_head = plo->bio_head->bi_next;
+	if (!plo->bio_head)
+		plo->bio_tail = NULL;
+
+	if (check_push_backup &&
+	    (bio->bi_rw & REQ_WRITE) && bio->bi_size &&
+	    plo->free_qlen <= plo->free_qmax / 2 &&
+	    plo->blockable_reqs > plo->free_qmax / 4 &&
+	    ploop_pb_bio_detained(plo->pbd, bio))
+		plo->blocked_bios++;
+	else
+		ploop_bio_queue(plo, bio, drop_list, check_push_backup);
+}
+
+static void process_bio_queue_optional(struct ploop_device * plo,
+				       struct list_head *drop_list)
 {
-	while (plo->bio_head && !list_empty(&plo->free_list)) {
-		struct bio *tmp = plo->bio_head;
+	while (plo->bio_head && !list_empty(&plo->free_list) &&
+	       (!test_bit(PLOOP_S_PUSH_BACKUP, &plo->state) ||
+		plo->free_qlen > plo->free_qmax / 2))
+		process_bio_queue_one(plo, drop_list, 0);
+}
 
-		BUG_ON (!plo->bio_tail);
-		plo->bio_head = plo->bio_head->bi_next;
-		if (!plo->bio_head)
-			plo->bio_tail = NULL;
+static void process_bio_queue_main(struct ploop_device * plo,
+				   struct list_head *drop_list)
+{
+	int check = test_bit(PLOOP_S_PUSH_BACKUP, &plo->state);
 
-		ploop_bio_queue(plo, tmp, drop_list, account_blockable);
-	}
+	while (plo->bio_head && !list_empty(&plo->free_list))
+		process_bio_queue_one(plo, drop_list, check);
 }
 
 static void ploop_unplug(struct blk_plug_cb *cb, bool from_schedule)
@@ -1022,7 +1045,7 @@ queue:
 	ploop_congest(plo);
 
 	/* second chance to merge requests */
-	process_bio_queue(plo, &drop_list, 0);
+	process_bio_queue_optional(plo, &drop_list);
 
 queued:
 	/* If main thread is waiting for requests, wake it up.
@@ -2858,6 +2881,20 @@ static void ploop_handle_enospc_req(struct ploop_request *preq)
 	preq->iblock = 0;
 }
 
+static void
+process_pending_bios(struct ploop_device * plo, struct list_head *drop_list)
+{
+	while (!ploop_pb_bio_list_empty(plo->pbd) &&
+	       !list_empty(&plo->free_list) &&
+	       (plo->free_qlen > plo->free_qmax / 2 ||
+		plo->blockable_reqs <= plo->free_qmax / 4)) {
+		struct bio *bio = ploop_pb_bio_get(plo->pbd);
+
+		ploop_bio_queue(plo, bio, drop_list, 1);
+		plo->blocked_bios--;
+	}
+}
+
 /* Main process. Processing queues in proper order, handling pre-barrier
  * flushes and queue suspend while processing a barrier
  */
@@ -2879,7 +2916,8 @@ static int ploop_thread(void * data)
 	again:
 		BUG_ON (!list_empty(&drop_list));
 
-		process_bio_queue(plo, &drop_list, 1);
+		process_pending_bios(plo, &drop_list);
+		process_bio_queue_main(plo, &drop_list);
 		process_discard_bio_queue(plo, &drop_list);
 
 		if (!list_empty(&drop_list)) {
@@ -2958,7 +2996,8 @@ static int ploop_thread(void * data)
 		 */
 		if (kthread_should_stop() && !plo->active_reqs &&
 		    list_empty(&plo->entry_queue) && !plo->bio_head &&
-		    bio_list_empty(&plo->bio_discard_list))
+		    bio_list_empty(&plo->bio_discard_list) &&
+		    ploop_pb_bio_list_empty(plo->pbd))
 			break;
 
 wait_more:
diff --git a/drivers/block/ploop/push_backup.c b/drivers/block/ploop/push_backup.c
index c58aadf..525576d 100644
--- a/drivers/block/ploop/push_backup.c
+++ b/drivers/block/ploop/push_backup.c
@@ -55,6 +55,8 @@ struct ploop_pushbackup_desc {
 	struct pb_set	      pending_set;
 	struct pb_set	      reported_set;
 
+	struct bio_list	      bio_pending_list;
+
 	struct task_struct   *health_monitor_thread;
 	wait_queue_head_t     ppb_waitq;
 	int		      ppb_state; /* see enum above */
@@ -184,6 +186,7 @@ struct ploop_pushbackup_desc *ploop_pb_alloc(struct ploop_device *plo)
 	ploop_pbs_init(&pbd->pending_set, pbd, "pending");
 	ploop_pbs_init(&pbd->reported_set, pbd, "reported");
 	init_waitqueue_head(&pbd->ppb_waitq);
+	bio_list_init(&pbd->bio_pending_list);
 	pbd->plo = plo;
 
 	return pbd;
@@ -679,6 +682,22 @@ bool ploop_pb_check_and_clear_bit(struct ploop_pushbackup_desc *pbd,
 	return true;
 }
 
+static void return_bios_back_to_plo(struct ploop_device *plo,
+				    struct bio_list *bl)
+{
+	if (!bl->head)
+		return;
+
+	if (plo->bio_tail)
+		plo->bio_tail->bi_next = bl->head;
+	else
+		plo->bio_head = bl->head;
+
+	plo->bio_tail = bl->tail;
+
+	bio_list_init(bl);
+}
+
 /* Always serialized by plo->ctl_mutex */
 unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 {
@@ -722,12 +741,13 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 		complete(&pbd->ppb_comp);
 	spin_unlock(&pbd->ppb_lock);
 
-	if (!list_empty(&drop_list)) {
+	if (!list_empty(&drop_list) || !ploop_pb_bio_list_empty(pbd)) {
 		struct ploop_device *plo = pbd->plo;
 
 		BUG_ON(!plo);
 		spin_lock_irq(&plo->lock);
 		list_splice_init(&drop_list, plo->ready_queue.prev);
+		return_bios_back_to_plo(plo, &pbd->bio_pending_list);
 		if (test_bit(PLOOP_S_WAIT_PROCESS, &plo->state))
 			wake_up_interruptible(&plo->waitq);
 		spin_unlock_irq(&plo->lock);
@@ -1045,3 +1065,33 @@ static void ploop_pb_timeout_func(unsigned long data)
 	}
 	spin_unlock(&pbd->ppb_lock);
 }
+
+/* Return true if bio was detained, false otherwise */
+bool ploop_pb_bio_detained(struct ploop_pushbackup_desc *pbd, struct bio *bio)
+{
+	cluster_t   clu = bio->bi_sector >> pbd->plo->cluster_log;
+
+	if (ploop_pb_check_and_clear_bit(pbd, clu)) {
+		bio_list_add(&pbd->bio_pending_list, bio);
+		return true;
+	}
+
+	return false;
+}
+
+/* Return true if no detained bio-s present, false otherwise */
+bool ploop_pb_bio_list_empty(struct ploop_pushbackup_desc *pbd)
+{
+	return !pbd || bio_list_empty(&pbd->bio_pending_list);
+}
+
+struct bio *ploop_pb_bio_get(struct ploop_pushbackup_desc *pbd)
+{
+	return bio_list_pop(&pbd->bio_pending_list);
+}
+
+void ploop_pb_bio_list_merge(struct ploop_pushbackup_desc *pbd,
+			     struct bio_list *tmp)
+{
+	bio_list_merge(&pbd->bio_pending_list, tmp);
+}
diff --git a/drivers/block/ploop/push_backup.h b/drivers/block/ploop/push_backup.h
index 0d479e0..0ed18e2 100644
--- a/drivers/block/ploop/push_backup.h
+++ b/drivers/block/ploop/push_backup.h
@@ -30,3 +30,8 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
 			       struct ploop_request *preq);
 
 int ploop_pb_destroy(struct ploop_device *plo, __u32 *status);
+
+bool ploop_pb_bio_detained(struct ploop_pushbackup_desc *pbd, struct bio *bio);
+bool ploop_pb_bio_list_empty(struct ploop_pushbackup_desc *pbd);
+struct bio *ploop_pb_bio_get(struct ploop_pushbackup_desc *pbd);
+void ploop_pb_bio_list_merge(struct ploop_pushbackup_desc *pbd, struct bio_list *tmp);
diff --git a/drivers/block/ploop/sysfs.c b/drivers/block/ploop/sysfs.c
index 2160fb31..9e4c9ab 100644
--- a/drivers/block/ploop/sysfs.c
+++ b/drivers/block/ploop/sysfs.c
@@ -440,6 +440,11 @@ static u32 show_blockable_reqs(struct ploop_device * plo)
 	return plo->blockable_reqs;
 }
 
+static u32 show_blocked_bios(struct ploop_device * plo)
+{
+	return plo->blocked_bios;
+}
+
 #define _TUNE_U32(_name)				\
 static u32 show_##_name(struct ploop_device * plo)	\
 {							\
@@ -525,6 +530,7 @@ static struct attribute *state_attributes[] = {
 	_A(free_reqs),
 	_A(free_qmax),
 	_A(blockable_reqs),
+	_A(blocked_bios),
 	NULL
 };
 
diff --git a/include/linux/ploop/ploop.h b/include/linux/ploop/ploop.h
index 43bba66..859fe51 100644
--- a/include/linux/ploop/ploop.h
+++ b/include/linux/ploop/ploop.h
@@ -367,6 +367,7 @@ struct ploop_device
 	int			free_qlen; /* len of free_list */
 	int			free_qmax; /* max len of free_list */
 	int			blockable_reqs; /* depends on userspace tool */
+	int			blocked_bios; /* depends on userspace tool */
 
 	struct bio		*bio_head;
 	struct bio		*bio_tail;



More information about the Devel mailing list