[Devel] [PATCH rh7] ploop: push_backup: fix pbd->ppb_lock deadlock
Maxim Patlasov
mpatlasov at virtuozzo.com
Wed Oct 19 22:55:28 PDT 2016
Ploop push_backup must use spin_lock_irq[save] for pbd->ppb_lock.
Otherwise a classic deadlock is possible:
1) vz_backup_client acquires ppb_lock:
ploop_ioctl -->
ploop_push_backup_io -->
ploop_push_backup_io_read -->
ploop_push_backup_io_get -->
ploop_pb_get_pending -->
spin_lock(&pbd->ppb_lock);
2) ploop_thread spins on ppb_lock while holding plo->lock:
ploop_thread calls "spin_lock_irq(&plo->lock);", then -->
process_bio_queue_main -->
process_bio_queue_one -->
ploop_pb_bio_detained -->
ploop_pb_check_and_clear_bit -->
spin_lock(&pbd->ppb_lock);
3) vz_backup_client is interrupted by bio completion:
bio_endio -->
bio->bi_end_io (== dio_endio_async) -->
ploop_complete_io_request -->
ploop_complete_io_state -->
spin_lock_irqsave(&plo->lock, flags);
>From now on, interrupt handler cannot proceed because ploop_thread
holds plo->lock, and ploop_thread cannot proceed because vz_backup_client
holds ppb_lock, and vz_backup_client cannot proceed because it's
interrupted by that interrupt handler. Classic deadlock.
Signed-off-by: Maxim Patlasov <mpatlasov at virtuozzo.com>
---
drivers/block/ploop/push_backup.c | 60 +++++++++++++++++++------------------
1 file changed, 31 insertions(+), 29 deletions(-)
diff --git a/drivers/block/ploop/push_backup.c b/drivers/block/ploop/push_backup.c
index 525576d..f825575 100644
--- a/drivers/block/ploop/push_backup.c
+++ b/drivers/block/ploop/push_backup.c
@@ -349,7 +349,7 @@ static int ploop_pb_health_monitor(void * data)
struct ploop_pushbackup_desc *pbd = data;
struct ploop_device *plo = pbd->plo;
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
while (!kthread_should_stop() || pbd->ppb_state == PLOOP_PB_STOPPING) {
DEFINE_WAIT(_wait);
@@ -359,21 +359,21 @@ static int ploop_pb_health_monitor(void * data)
kthread_should_stop())
break;
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
schedule();
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
}
finish_wait(&pbd->ppb_waitq, &_wait);
if (pbd->ppb_state == PLOOP_PB_STOPPING) {
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
mutex_lock(&plo->ctl_mutex);
ploop_pb_stop(pbd, true);
mutex_unlock(&plo->ctl_mutex);
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
}
}
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
return 0;
}
@@ -633,21 +633,21 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
{
BUG_ON(!pbd);
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
if (pbd->ppb_state != PLOOP_PB_ALIVE) {
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
return -ESTALE;
}
if (!test_bit(PLOOP_S_PUSH_BACKUP, &pbd->plo->state)) {
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
return -EINTR;
}
if (check_bit_in_map(pbd->reported_map, pbd->ppb_block_max,
preq->req_cluster)) {
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
return -EALREADY;
}
@@ -656,7 +656,7 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
if (pbd->ppb_waiting)
complete(&pbd->ppb_comp);
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
return 0;
}
@@ -708,20 +708,20 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
if (pbd == NULL)
return 0;
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
if (pbd->ppb_state == PLOOP_PB_DEAD) {
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
return 0;
}
pbd->ppb_state = PLOOP_PB_DEAD;
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
ploop_pbs_fini(&pbd->pending_set);
ploop_pbs_fini(&pbd->reported_set);
merge_status = ploop_pb_cbt_map_release(pbd, do_merge);
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
while (!RB_EMPTY_ROOT(&pbd->pending_set.tree)) {
struct ploop_request *preq =
@@ -739,7 +739,7 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
if (pbd->ppb_waiting)
complete(&pbd->ppb_comp);
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
if (!list_empty(&drop_list) || !ploop_pb_bio_list_empty(pbd)) {
struct ploop_device *plo = pbd->plo;
@@ -763,7 +763,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
struct ploop_request *preq, *npreq;
int err = 0;
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
preq = ploop_pb_get_first_reqs_from_pending(pbd, &npreq);
if (!preq) {
@@ -785,7 +785,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
goto get_pending_unlock;
}
pbd->ppb_waiting = true;
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
mutex_unlock(&plo->ctl_mutex);
err = wait_for_completion_interruptible(&pbd->ppb_comp);
@@ -794,7 +794,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
if (plo->pbd != pbd)
return -EINTR;
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
pbd->ppb_waiting = false;
init_completion(&pbd->ppb_comp);
@@ -834,7 +834,7 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
}
get_pending_unlock:
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
return err;
}
@@ -875,7 +875,7 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
if (!page)
return -ENOMEM;
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
while (block < pbd->ppb_block_max) {
fill_page_to_backup(pbd, idx, page);
off = block & (BITS_PER_PAGE -1);
@@ -903,7 +903,7 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
idx++;
block = idx << (PAGE_SHIFT + 3);
}
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
__free_page(page);
@@ -951,7 +951,7 @@ void ploop_pb_put_reported(struct ploop_pushbackup_desc *pbd,
int n_found = 0;
LIST_HEAD(ready_list);
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irq(&pbd->ppb_lock);
ploop_pb_process_extent(&pbd->reported_set, clu, len, &ready_list, &n_found);
ploop_pb_process_extent(&pbd->pending_set, clu, len, &ready_list, NULL);
@@ -966,7 +966,7 @@ void ploop_pb_put_reported(struct ploop_pushbackup_desc *pbd,
*/
set_bits_in_map(pbd->reported_map, pbd->ppb_block_max, clu, len);
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irq(&pbd->ppb_lock);
if (!list_empty(&ready_list)) {
struct ploop_device *plo = pbd->plo;
@@ -1011,14 +1011,15 @@ static bool ploop_pb_set_expired(struct pb_set *pbs)
unsigned long tstamp = 0;
cluster_t clu = 0;
bool ret = false;
+ unsigned long flags;
if (!timeout)
return false;
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irqsave(&pbd->ppb_lock, flags);
if (pbd->ppb_state != PLOOP_PB_ALIVE) {
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irqrestore(&pbd->ppb_lock, flags);
return false;
}
@@ -1034,7 +1035,7 @@ static bool ploop_pb_set_expired(struct pb_set *pbs)
mod_timer(&pbs->timer, preq->tstamp + timeout + 1);
}
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irqrestore(&pbd->ppb_lock, flags);
if (ret)
printk(KERN_WARNING "Abort push_backup for ploop%d: found "
@@ -1050,6 +1051,7 @@ static void ploop_pb_timeout_func(unsigned long data)
struct pb_set *pbs = (void*)data;
struct ploop_pushbackup_desc *pbd = pbs->pbd;
struct ploop_device *plo = pbd->plo;
+ unsigned long flags;
if (!plo->tune.push_backup_timeout ||
!test_bit(PLOOP_S_RUNNING, &plo->state) ||
@@ -1057,13 +1059,13 @@ static void ploop_pb_timeout_func(unsigned long data)
!ploop_pb_set_expired(pbs))
return;
- spin_lock(&pbd->ppb_lock);
+ spin_lock_irqsave(&pbd->ppb_lock, flags);
if (pbd->ppb_state == PLOOP_PB_ALIVE) {
pbd->ppb_state = PLOOP_PB_STOPPING;
if (waitqueue_active(&pbd->ppb_waitq))
wake_up_interruptible(&pbd->ppb_waitq);
}
- spin_unlock(&pbd->ppb_lock);
+ spin_unlock_irqrestore(&pbd->ppb_lock, flags);
}
/* Return true if bio was detained, false otherwise */
More information about the Devel
mailing list