[Devel] [PATCH rh7] ploop: push_backup: fix pbd->ppb_lock deadlock

Maxim Patlasov mpatlasov at virtuozzo.com
Wed Oct 19 22:55:28 PDT 2016


Ploop push_backup must use spin_lock_irq[save] for pbd->ppb_lock.
Otherwise a classic deadlock is possible:

1) vz_backup_client acquires ppb_lock:

  ploop_ioctl -->
    ploop_push_backup_io -->
      ploop_push_backup_io_read -->
        ploop_push_backup_io_get -->
          ploop_pb_get_pending -->
            spin_lock(&pbd->ppb_lock);

2) ploop_thread spins on ppb_lock while holding plo->lock:

  ploop_thread calls "spin_lock_irq(&plo->lock);", then -->
    process_bio_queue_main -->
      process_bio_queue_one -->
        ploop_pb_bio_detained -->
          ploop_pb_check_and_clear_bit -->
            spin_lock(&pbd->ppb_lock);

3) vz_backup_client is interrupted by bio completion:

  bio_endio -->
    bio->bi_end_io (== dio_endio_async) -->
      ploop_complete_io_request -->
        ploop_complete_io_state -->
          spin_lock_irqsave(&plo->lock, flags);

>From now on, interrupt handler cannot proceed because ploop_thread
holds plo->lock, and ploop_thread cannot proceed because vz_backup_client
holds ppb_lock, and vz_backup_client cannot proceed because it's
interrupted by that interrupt handler. Classic deadlock.

Signed-off-by: Maxim Patlasov <mpatlasov at virtuozzo.com>
---
 drivers/block/ploop/push_backup.c |   60 +++++++++++++++++++------------------
 1 file changed, 31 insertions(+), 29 deletions(-)

diff --git a/drivers/block/ploop/push_backup.c b/drivers/block/ploop/push_backup.c
index 525576d..f825575 100644
--- a/drivers/block/ploop/push_backup.c
+++ b/drivers/block/ploop/push_backup.c
@@ -349,7 +349,7 @@ static int ploop_pb_health_monitor(void * data)
 	struct ploop_pushbackup_desc *pbd = data;
 	struct ploop_device	     *plo = pbd->plo;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 	while (!kthread_should_stop() || pbd->ppb_state == PLOOP_PB_STOPPING) {
 
 		DEFINE_WAIT(_wait);
@@ -359,21 +359,21 @@ static int ploop_pb_health_monitor(void * data)
 			    kthread_should_stop())
 				break;
 
-			spin_unlock(&pbd->ppb_lock);
+			spin_unlock_irq(&pbd->ppb_lock);
 			schedule();
-			spin_lock(&pbd->ppb_lock);
+			spin_lock_irq(&pbd->ppb_lock);
 		}
 		finish_wait(&pbd->ppb_waitq, &_wait);
 
 		if (pbd->ppb_state == PLOOP_PB_STOPPING) {
-			spin_unlock(&pbd->ppb_lock);
+			spin_unlock_irq(&pbd->ppb_lock);
 			mutex_lock(&plo->ctl_mutex);
 			ploop_pb_stop(pbd, true);
 			mutex_unlock(&plo->ctl_mutex);
-			spin_lock(&pbd->ppb_lock);
+			spin_lock_irq(&pbd->ppb_lock);
 		}
 	}
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 	return 0;
 }
 
@@ -633,21 +633,21 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
 {
 	BUG_ON(!pbd);
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 
 	if (pbd->ppb_state != PLOOP_PB_ALIVE) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 		return -ESTALE;
 	}
 
 	if (!test_bit(PLOOP_S_PUSH_BACKUP, &pbd->plo->state)) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 		return -EINTR;
 	}
 
 	if (check_bit_in_map(pbd->reported_map, pbd->ppb_block_max,
 			     preq->req_cluster)) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 		return -EALREADY;
 	}
 
@@ -656,7 +656,7 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
 	if (pbd->ppb_waiting)
 		complete(&pbd->ppb_comp);
 
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 	return 0;
 }
 
@@ -708,20 +708,20 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 	if (pbd == NULL)
 		return 0;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 	if (pbd->ppb_state == PLOOP_PB_DEAD) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 		return 0;
 	}
 	pbd->ppb_state = PLOOP_PB_DEAD;
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 
 	ploop_pbs_fini(&pbd->pending_set);
 	ploop_pbs_fini(&pbd->reported_set);
 
 	merge_status = ploop_pb_cbt_map_release(pbd, do_merge);
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 
 	while (!RB_EMPTY_ROOT(&pbd->pending_set.tree)) {
 		struct ploop_request *preq =
@@ -739,7 +739,7 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 
 	if (pbd->ppb_waiting)
 		complete(&pbd->ppb_comp);
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 
 	if (!list_empty(&drop_list) || !ploop_pb_bio_list_empty(pbd)) {
 		struct ploop_device *plo = pbd->plo;
@@ -763,7 +763,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 	struct ploop_request *preq, *npreq;
 	int err = 0;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 
 	preq = ploop_pb_get_first_reqs_from_pending(pbd, &npreq);
 	if (!preq) {
@@ -785,7 +785,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 			goto get_pending_unlock;
 		}
 		pbd->ppb_waiting = true;
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 
 		mutex_unlock(&plo->ctl_mutex);
 		err = wait_for_completion_interruptible(&pbd->ppb_comp);
@@ -794,7 +794,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 		if (plo->pbd != pbd)
 			return -EINTR;
 
-		spin_lock(&pbd->ppb_lock);
+		spin_lock_irq(&pbd->ppb_lock);
 		pbd->ppb_waiting = false;
 		init_completion(&pbd->ppb_comp);
 
@@ -834,7 +834,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 	}
 
 get_pending_unlock:
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 	return err;
 }
 
@@ -875,7 +875,7 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
 	if (!page)
 		return -ENOMEM;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 	while (block < pbd->ppb_block_max) {
 		fill_page_to_backup(pbd, idx, page);
 		off = block & (BITS_PER_PAGE -1);
@@ -903,7 +903,7 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
 		idx++;
 		block = idx << (PAGE_SHIFT + 3);
 	}
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 
 	__free_page(page);
 
@@ -951,7 +951,7 @@ void ploop_pb_put_reported(struct ploop_pushbackup_desc *pbd,
 	int n_found = 0;
 	LIST_HEAD(ready_list);
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 
 	ploop_pb_process_extent(&pbd->reported_set, clu, len, &ready_list, &n_found);
 	ploop_pb_process_extent(&pbd->pending_set, clu, len, &ready_list, NULL);
@@ -966,7 +966,7 @@ void ploop_pb_put_reported(struct ploop_pushbackup_desc *pbd,
 	 */
 	set_bits_in_map(pbd->reported_map, pbd->ppb_block_max, clu, len);
 
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 
 	if (!list_empty(&ready_list)) {
 		struct ploop_device *plo = pbd->plo;
@@ -1011,14 +1011,15 @@ static bool ploop_pb_set_expired(struct pb_set *pbs)
 	unsigned long tstamp = 0;
 	cluster_t clu = 0;
 	bool ret = false;
+	unsigned long flags;
 
 	if (!timeout)
 		return false;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irqsave(&pbd->ppb_lock, flags);
 
 	if (pbd->ppb_state != PLOOP_PB_ALIVE) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irqrestore(&pbd->ppb_lock, flags);
 		return false;
 	}
 
@@ -1034,7 +1035,7 @@ static bool ploop_pb_set_expired(struct pb_set *pbs)
 			mod_timer(&pbs->timer, preq->tstamp + timeout + 1);
 	}
 
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irqrestore(&pbd->ppb_lock, flags);
 
 	if (ret)
 		printk(KERN_WARNING "Abort push_backup for ploop%d: found "
@@ -1050,6 +1051,7 @@ static void ploop_pb_timeout_func(unsigned long data)
 	struct pb_set                *pbs = (void*)data;
 	struct ploop_pushbackup_desc *pbd = pbs->pbd;
 	struct ploop_device          *plo = pbd->plo;
+	unsigned long flags;
 
 	if (!plo->tune.push_backup_timeout ||
 	    !test_bit(PLOOP_S_RUNNING, &plo->state) ||
@@ -1057,13 +1059,13 @@ static void ploop_pb_timeout_func(unsigned long data)
 	    !ploop_pb_set_expired(pbs))
 		return;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irqsave(&pbd->ppb_lock, flags);
 	if (pbd->ppb_state == PLOOP_PB_ALIVE) {
 		pbd->ppb_state = PLOOP_PB_STOPPING;
 		if (waitqueue_active(&pbd->ppb_waitq))
 			wake_up_interruptible(&pbd->ppb_waitq);
 	}
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irqrestore(&pbd->ppb_lock, flags);
 }
 
 /* Return true if bio was detained, false otherwise */



More information about the Devel mailing list