[Devel] [PATCH RHEL7 COMMIT] ploop: push_backup: fix pbd->ppb_lock deadlock

Konstantin Khorenko khorenko at virtuozzo.com
Mon Oct 24 04:53:24 PDT 2016


The commit is pushed to "branch-rh7-3.10.0-327.36.1.vz7.19.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-327.36.1.vz7.19.3
------>
commit 4ca9e0d9190a401ea8fe864fe0483d843d1423d8
Author: Maxim Patlasov <mpatlasov at virtuozzo.com>
Date:   Mon Oct 24 15:53:24 2016 +0400

    ploop: push_backup: fix pbd->ppb_lock deadlock
    
    Ploop push_backup must use spin_lock_irq[save] for pbd->ppb_lock.
    Otherwise a classic deadlock is possible:
    
    1) vz_backup_client acquires ppb_lock:
    
      ploop_ioctl -->
        ploop_push_backup_io -->
          ploop_push_backup_io_read -->
            ploop_push_backup_io_get -->
              ploop_pb_get_pending -->
                spin_lock(&pbd->ppb_lock);
    
    2) ploop_thread spins on ppb_lock while holding plo->lock:
    
      ploop_thread calls "spin_lock_irq(&plo->lock);", then -->
        process_bio_queue_main -->
          process_bio_queue_one -->
            ploop_pb_bio_detained -->
              ploop_pb_check_and_clear_bit -->
                spin_lock(&pbd->ppb_lock);
    
    3) vz_backup_client is interrupted by bio completion:
    
      bio_endio -->
        bio->bi_end_io (== dio_endio_async) -->
          ploop_complete_io_request -->
            ploop_complete_io_state -->
              spin_lock_irqsave(&plo->lock, flags);
    
    >From now on, interrupt handler cannot proceed because ploop_thread
    holds plo->lock, and ploop_thread cannot proceed because vz_backup_client
    holds ppb_lock, and vz_backup_client cannot proceed because it's
    interrupted by that interrupt handler. Classic deadlock.
    
    Signed-off-by: Maxim Patlasov <mpatlasov at virtuozzo.com>
---
 drivers/block/ploop/push_backup.c | 60 ++++++++++++++++++++-------------------
 1 file changed, 31 insertions(+), 29 deletions(-)

diff --git a/drivers/block/ploop/push_backup.c b/drivers/block/ploop/push_backup.c
index 525576d..f825575 100644
--- a/drivers/block/ploop/push_backup.c
+++ b/drivers/block/ploop/push_backup.c
@@ -349,7 +349,7 @@ static int ploop_pb_health_monitor(void * data)
 	struct ploop_pushbackup_desc *pbd = data;
 	struct ploop_device	     *plo = pbd->plo;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 	while (!kthread_should_stop() || pbd->ppb_state == PLOOP_PB_STOPPING) {
 
 		DEFINE_WAIT(_wait);
@@ -359,21 +359,21 @@ static int ploop_pb_health_monitor(void * data)
 			    kthread_should_stop())
 				break;
 
-			spin_unlock(&pbd->ppb_lock);
+			spin_unlock_irq(&pbd->ppb_lock);
 			schedule();
-			spin_lock(&pbd->ppb_lock);
+			spin_lock_irq(&pbd->ppb_lock);
 		}
 		finish_wait(&pbd->ppb_waitq, &_wait);
 
 		if (pbd->ppb_state == PLOOP_PB_STOPPING) {
-			spin_unlock(&pbd->ppb_lock);
+			spin_unlock_irq(&pbd->ppb_lock);
 			mutex_lock(&plo->ctl_mutex);
 			ploop_pb_stop(pbd, true);
 			mutex_unlock(&plo->ctl_mutex);
-			spin_lock(&pbd->ppb_lock);
+			spin_lock_irq(&pbd->ppb_lock);
 		}
 	}
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 	return 0;
 }
 
@@ -633,21 +633,21 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
 {
 	BUG_ON(!pbd);
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 
 	if (pbd->ppb_state != PLOOP_PB_ALIVE) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 		return -ESTALE;
 	}
 
 	if (!test_bit(PLOOP_S_PUSH_BACKUP, &pbd->plo->state)) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 		return -EINTR;
 	}
 
 	if (check_bit_in_map(pbd->reported_map, pbd->ppb_block_max,
 			     preq->req_cluster)) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 		return -EALREADY;
 	}
 
@@ -656,7 +656,7 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
 	if (pbd->ppb_waiting)
 		complete(&pbd->ppb_comp);
 
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 	return 0;
 }
 
@@ -708,20 +708,20 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 	if (pbd == NULL)
 		return 0;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 	if (pbd->ppb_state == PLOOP_PB_DEAD) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 		return 0;
 	}
 	pbd->ppb_state = PLOOP_PB_DEAD;
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 
 	ploop_pbs_fini(&pbd->pending_set);
 	ploop_pbs_fini(&pbd->reported_set);
 
 	merge_status = ploop_pb_cbt_map_release(pbd, do_merge);
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 
 	while (!RB_EMPTY_ROOT(&pbd->pending_set.tree)) {
 		struct ploop_request *preq =
@@ -739,7 +739,7 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 
 	if (pbd->ppb_waiting)
 		complete(&pbd->ppb_comp);
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 
 	if (!list_empty(&drop_list) || !ploop_pb_bio_list_empty(pbd)) {
 		struct ploop_device *plo = pbd->plo;
@@ -763,7 +763,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 	struct ploop_request *preq, *npreq;
 	int err = 0;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 
 	preq = ploop_pb_get_first_reqs_from_pending(pbd, &npreq);
 	if (!preq) {
@@ -785,7 +785,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 			goto get_pending_unlock;
 		}
 		pbd->ppb_waiting = true;
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irq(&pbd->ppb_lock);
 
 		mutex_unlock(&plo->ctl_mutex);
 		err = wait_for_completion_interruptible(&pbd->ppb_comp);
@@ -794,7 +794,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 		if (plo->pbd != pbd)
 			return -EINTR;
 
-		spin_lock(&pbd->ppb_lock);
+		spin_lock_irq(&pbd->ppb_lock);
 		pbd->ppb_waiting = false;
 		init_completion(&pbd->ppb_comp);
 
@@ -834,7 +834,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 	}
 
 get_pending_unlock:
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 	return err;
 }
 
@@ -875,7 +875,7 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
 	if (!page)
 		return -ENOMEM;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 	while (block < pbd->ppb_block_max) {
 		fill_page_to_backup(pbd, idx, page);
 		off = block & (BITS_PER_PAGE -1);
@@ -903,7 +903,7 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
 		idx++;
 		block = idx << (PAGE_SHIFT + 3);
 	}
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 
 	__free_page(page);
 
@@ -951,7 +951,7 @@ void ploop_pb_put_reported(struct ploop_pushbackup_desc *pbd,
 	int n_found = 0;
 	LIST_HEAD(ready_list);
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irq(&pbd->ppb_lock);
 
 	ploop_pb_process_extent(&pbd->reported_set, clu, len, &ready_list, &n_found);
 	ploop_pb_process_extent(&pbd->pending_set, clu, len, &ready_list, NULL);
@@ -966,7 +966,7 @@ void ploop_pb_put_reported(struct ploop_pushbackup_desc *pbd,
 	 */
 	set_bits_in_map(pbd->reported_map, pbd->ppb_block_max, clu, len);
 
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irq(&pbd->ppb_lock);
 
 	if (!list_empty(&ready_list)) {
 		struct ploop_device *plo = pbd->plo;
@@ -1011,14 +1011,15 @@ static bool ploop_pb_set_expired(struct pb_set *pbs)
 	unsigned long tstamp = 0;
 	cluster_t clu = 0;
 	bool ret = false;
+	unsigned long flags;
 
 	if (!timeout)
 		return false;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irqsave(&pbd->ppb_lock, flags);
 
 	if (pbd->ppb_state != PLOOP_PB_ALIVE) {
-		spin_unlock(&pbd->ppb_lock);
+		spin_unlock_irqrestore(&pbd->ppb_lock, flags);
 		return false;
 	}
 
@@ -1034,7 +1035,7 @@ static bool ploop_pb_set_expired(struct pb_set *pbs)
 			mod_timer(&pbs->timer, preq->tstamp + timeout + 1);
 	}
 
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irqrestore(&pbd->ppb_lock, flags);
 
 	if (ret)
 		printk(KERN_WARNING "Abort push_backup for ploop%d: found "
@@ -1050,6 +1051,7 @@ static void ploop_pb_timeout_func(unsigned long data)
 	struct pb_set                *pbs = (void*)data;
 	struct ploop_pushbackup_desc *pbd = pbs->pbd;
 	struct ploop_device          *plo = pbd->plo;
+	unsigned long flags;
 
 	if (!plo->tune.push_backup_timeout ||
 	    !test_bit(PLOOP_S_RUNNING, &plo->state) ||
@@ -1057,13 +1059,13 @@ static void ploop_pb_timeout_func(unsigned long data)
 	    !ploop_pb_set_expired(pbs))
 		return;
 
-	spin_lock(&pbd->ppb_lock);
+	spin_lock_irqsave(&pbd->ppb_lock, flags);
 	if (pbd->ppb_state == PLOOP_PB_ALIVE) {
 		pbd->ppb_state = PLOOP_PB_STOPPING;
 		if (waitqueue_active(&pbd->ppb_waitq))
 			wake_up_interruptible(&pbd->ppb_waitq);
 	}
-	spin_unlock(&pbd->ppb_lock);
+	spin_unlock_irqrestore(&pbd->ppb_lock, flags);
 }
 
 /* Return true if bio was detained, false otherwise */


More information about the Devel mailing list