[Devel] [PATCH RHEL7 COMMIT] ploop: push_backup: health monitor thread

Konstantin Khorenko khorenko at virtuozzo.com
Mon Jun 27 00:11:23 PDT 2016


The commit is pushed to "branch-rh7-3.10.0-327.18.2.vz7.14.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-327.18.2.vz7.14.20
------>
commit 6a64ab2f267073beffc51d4206edd5b0bc37b19a
Author: Maxim Patlasov <mpatlasov at virtuozzo.com>
Date:   Mon Jun 27 11:11:23 2016 +0400

    ploop: push_backup: health monitor thread
    
    ploop_pb_health_monitor() is a stand-alone thread who responds on "expiration"
    event by calling ploop_pb_stop(). It lives as long as ploop_pushbackup_desc
    exists.
    
    The patch also introduces pbd->ppb_state with three possible values:
    
    enum {
    	PLOOP_PB_ALIVE,
    	PLOOP_PB_STOPPING,
    	PLOOP_PB_DEAD,
    };
    
    PLOOP_PB_ALIVE is default, it means that everything is fine. PLOOP_PB_DEAD
    essentially means that ploop_pb_stop() was called. I.e. bitmask was merged
    back to CBT (unless userspace specify explicitly not to do it), and all
    ploop requests from pending/reported trees were forcibly returned to
    plo->ready queue. PLOOP_PB_STOPPING is a transitionary state: we decided to
    abort push_backup due to timeout, but ploop_pb_health_monitor() has not
    processed it yet.
    
    https://jira.sw.ru/browse/PSBM-48082
    
    Signed-off-by: Maxim Patlasov <mpatlasov at virtuozzo.com>
---
 drivers/block/ploop/push_backup.c | 86 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 85 insertions(+), 1 deletion(-)

diff --git a/drivers/block/ploop/push_backup.c b/drivers/block/ploop/push_backup.c
index 5058ec1..8fae402 100644
--- a/drivers/block/ploop/push_backup.c
+++ b/drivers/block/ploop/push_backup.c
@@ -31,6 +31,12 @@ struct pb_set {
 	struct ploop_pushbackup_desc *pbd; /* points to parent pbd */
 };
 
+enum {
+	PLOOP_PB_ALIVE,
+	PLOOP_PB_STOPPING,
+	PLOOP_PB_DEAD,
+};
+
 struct ploop_pushbackup_desc {
 	struct ploop_device *plo;
 	struct page **cbt_map; /* a 'snapshot' copy of CBT mask */
@@ -48,6 +54,10 @@ struct ploop_pushbackup_desc {
 
 	struct pb_set	      pending_set;
 	struct pb_set	      reported_set;
+
+	struct task_struct   *health_monitor_thread;
+	wait_queue_head_t     ppb_waitq;
+	int		      ppb_state; /* see enum above */
 };
 
 int ploop_pb_check_uuid(struct ploop_pushbackup_desc *pbd, __u8 *uuid)
@@ -173,6 +183,7 @@ struct ploop_pushbackup_desc *ploop_pb_alloc(struct ploop_device *plo)
 	init_completion(&pbd->ppb_comp);
 	ploop_pbs_init(&pbd->pending_set, pbd, "pending");
 	ploop_pbs_init(&pbd->reported_set, pbd, "reported");
+	init_waitqueue_head(&pbd->ppb_waitq);
 	pbd->plo = plo;
 
 	return pbd;
@@ -330,8 +341,42 @@ static int convert_map_to_map(struct ploop_pushbackup_desc *pbd)
 
 }
 
+static int ploop_pb_health_monitor(void * data)
+{
+	struct ploop_pushbackup_desc *pbd = data;
+	struct ploop_device	     *plo = pbd->plo;
+
+	spin_lock(&pbd->ppb_lock);
+	while (!kthread_should_stop() || pbd->ppb_state == PLOOP_PB_STOPPING) {
+
+		DEFINE_WAIT(_wait);
+		for (;;) {
+			prepare_to_wait(&pbd->ppb_waitq, &_wait, TASK_INTERRUPTIBLE);
+			if (pbd->ppb_state == PLOOP_PB_STOPPING ||
+			    kthread_should_stop())
+				break;
+
+			spin_unlock(&pbd->ppb_lock);
+			schedule();
+			spin_lock(&pbd->ppb_lock);
+		}
+		finish_wait(&pbd->ppb_waitq, &_wait);
+
+		if (pbd->ppb_state == PLOOP_PB_STOPPING) {
+			spin_unlock(&pbd->ppb_lock);
+			mutex_lock(&plo->ctl_mutex);
+			ploop_pb_stop(pbd, true);
+			mutex_unlock(&plo->ctl_mutex);
+			spin_lock(&pbd->ppb_lock);
+		}
+	}
+	spin_unlock(&pbd->ppb_lock);
+	return 0;
+}
+
 int ploop_pb_init(struct ploop_pushbackup_desc *pbd, __u8 *uuid, bool full)
 {
+	struct task_struct *ts;
 	int rc;
 
 	memcpy(pbd->cbt_uuid, uuid, sizeof(pbd->cbt_uuid));
@@ -359,7 +404,18 @@ int ploop_pb_init(struct ploop_pushbackup_desc *pbd, __u8 *uuid, bool full)
 	if (rc)
 		return rc;
 
-	return convert_map_to_map(pbd);
+	rc = convert_map_to_map(pbd);
+	if (rc)
+		return rc;
+
+	ts = kthread_create(ploop_pb_health_monitor, pbd, "ploop_pb_hm%d",
+			    pbd->plo->index);
+	if (IS_ERR(ts))
+		return PTR_ERR(ts);
+
+	pbd->health_monitor_thread = ts;
+	wake_up_process(ts);
+	return 0;
 }
 
 void ploop_pb_fini(struct ploop_pushbackup_desc *pbd)
@@ -372,6 +428,11 @@ void ploop_pb_fini(struct ploop_pushbackup_desc *pbd)
 	if (!RB_EMPTY_ROOT(&pbd->reported_set.tree))
 		printk("ploop_pb_fini: reported_tree is not empty!\n");
 
+	if (pbd->health_monitor_thread) {
+		kthread_stop(pbd->health_monitor_thread);
+		pbd->health_monitor_thread = NULL;
+	}
+
 	if (pbd->plo) {
 		struct ploop_device *plo = pbd->plo;
 		mutex_lock(&plo->sysfs_mutex);
@@ -552,6 +613,11 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
 
 	spin_lock(&pbd->ppb_lock);
 
+	if (pbd->ppb_state != PLOOP_PB_ALIVE) {
+		spin_unlock(&pbd->ppb_lock);
+		return -ESTALE;
+	}
+
 	if (!test_bit(PLOOP_S_PUSH_BACKUP, &pbd->plo->state)) {
 		spin_unlock(&pbd->ppb_lock);
 		return -EINTR;
@@ -572,6 +638,7 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
 	return 0;
 }
 
+/* Always serialized by plo->ctl_mutex */
 unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 {
 	unsigned long ret = 0;
@@ -581,6 +648,14 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 	if (pbd == NULL)
 		return 0;
 
+	spin_lock(&pbd->ppb_lock);
+	if (pbd->ppb_state == PLOOP_PB_DEAD) {
+		spin_unlock(&pbd->ppb_lock);
+		return 0;
+	}
+	pbd->ppb_state = PLOOP_PB_DEAD;
+	spin_unlock(&pbd->ppb_lock);
+
 	ploop_pbs_fini(&pbd->pending_set);
 	ploop_pbs_fini(&pbd->reported_set);
 
@@ -639,6 +714,10 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 		}
 
                 /* blocking case */
+		if (pbd->ppb_state != PLOOP_PB_ALIVE) {
+			err = -ESTALE;
+			goto get_pending_unlock;
+		}
 		if (unlikely(pbd->ppb_waiting)) {
 			/* Other task is already waiting for event */
 			err = -EBUSY;
@@ -662,6 +741,8 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
 		if (!preq) {
 			if (!test_bit(PLOOP_S_PUSH_BACKUP, &plo->state))
 				err = -EINTR;
+			else if (pbd->ppb_state != PLOOP_PB_ALIVE)
+				err =  -ESTALE;
 			else if (signal_pending(current))
 				err = -ERESTARTSYS;
 			else err = -ENOENT;
@@ -726,6 +807,9 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
 	if (block >= pbd->ppb_block_max)
 		return -ENOENT;
 
+	if (pbd->ppb_state != PLOOP_PB_ALIVE)
+		return -ESTALE;
+
 	page = alloc_page(GFP_KERNEL);
 	if (!page)
 		return -ENOMEM;


More information about the Devel mailing list