[Devel] [PATCH rh7 5/6] ploop: push_backup: health monitor thread
Maxim Patlasov
mpatlasov at virtuozzo.com
Tue Jun 14 17:56:09 PDT 2016
ploop_pb_health_monitor() is a stand-alone thread who responds on "expiration"
event by calling ploop_pb_stop(). It lives as long as ploop_pushbackup_desc
exists.
The patch also introduces pbd->ppb_state with three possible values:
enum {
PLOOP_PB_ALIVE,
PLOOP_PB_STOPPING,
PLOOP_PB_DEAD,
};
PLOOP_PB_ALIVE is default, it means that everything is fine. PLOOP_PB_DEAD
essentially means that ploop_pb_stop() was called. I.e. bitmask was merged
back to CBT (unless userspace specify explicitly not to do it), and all
ploop requests from pending/reported trees were forcibly returned to
plo->ready queue. PLOOP_PB_STOPPING is a transitionary state: we decided to
abort push_backup due to timeout, but ploop_pb_health_monitor() has not
processed it yet.
https://jira.sw.ru/browse/PSBM-48082
Signed-off-by: Maxim Patlasov <mpatlasov at virtuozzo.com>
---
drivers/block/ploop/push_backup.c | 86 +++++++++++++++++++++++++++++++++++++
1 file changed, 85 insertions(+), 1 deletion(-)
diff --git a/drivers/block/ploop/push_backup.c b/drivers/block/ploop/push_backup.c
index 070e147..97bfa9f 100644
--- a/drivers/block/ploop/push_backup.c
+++ b/drivers/block/ploop/push_backup.c
@@ -24,6 +24,12 @@ struct pb_set {
struct ploop_pushbackup_desc *pbd; /* points to parent pbd */
};
+enum {
+ PLOOP_PB_ALIVE,
+ PLOOP_PB_STOPPING,
+ PLOOP_PB_DEAD,
+};
+
struct ploop_pushbackup_desc {
struct ploop_device *plo;
struct page **cbt_map; /* a 'snapshot' copy of CBT mask */
@@ -41,6 +47,10 @@ struct ploop_pushbackup_desc {
struct pb_set pending_set;
struct pb_set reported_set;
+
+ struct task_struct *health_monitor_thread;
+ wait_queue_head_t ppb_waitq;
+ int ppb_state; /* see enum above */
};
int ploop_pb_check_uuid(struct ploop_pushbackup_desc *pbd, __u8 *uuid)
@@ -166,6 +176,7 @@ struct ploop_pushbackup_desc *ploop_pb_alloc(struct ploop_device *plo)
init_completion(&pbd->ppb_comp);
ploop_pbs_init(&pbd->pending_set, pbd, "pending");
ploop_pbs_init(&pbd->reported_set, pbd, "reported");
+ init_waitqueue_head(&pbd->ppb_waitq);
pbd->plo = plo;
return pbd;
@@ -323,8 +334,42 @@ static int convert_map_to_map(struct ploop_pushbackup_desc *pbd)
}
+static int ploop_pb_health_monitor(void * data)
+{
+ struct ploop_pushbackup_desc *pbd = data;
+ struct ploop_device *plo = pbd->plo;
+
+ spin_lock(&pbd->ppb_lock);
+ while (!kthread_should_stop() || pbd->ppb_state == PLOOP_PB_STOPPING) {
+
+ DEFINE_WAIT(_wait);
+ for (;;) {
+ prepare_to_wait(&pbd->ppb_waitq, &_wait, TASK_INTERRUPTIBLE);
+ if (pbd->ppb_state == PLOOP_PB_STOPPING ||
+ kthread_should_stop())
+ break;
+
+ spin_unlock(&pbd->ppb_lock);
+ schedule();
+ spin_lock(&pbd->ppb_lock);
+ }
+ finish_wait(&pbd->ppb_waitq, &_wait);
+
+ if (pbd->ppb_state == PLOOP_PB_STOPPING) {
+ spin_unlock(&pbd->ppb_lock);
+ mutex_lock(&plo->ctl_mutex);
+ ploop_pb_stop(pbd, true);
+ mutex_unlock(&plo->ctl_mutex);
+ spin_lock(&pbd->ppb_lock);
+ }
+ }
+ spin_unlock(&pbd->ppb_lock);
+ return 0;
+}
+
int ploop_pb_init(struct ploop_pushbackup_desc *pbd, __u8 *uuid, bool full)
{
+ struct task_struct *ts;
int rc;
memcpy(pbd->cbt_uuid, uuid, sizeof(pbd->cbt_uuid));
@@ -352,7 +397,18 @@ int ploop_pb_init(struct ploop_pushbackup_desc *pbd, __u8 *uuid, bool full)
if (rc)
return rc;
- return convert_map_to_map(pbd);
+ rc = convert_map_to_map(pbd);
+ if (rc)
+ return rc;
+
+ ts = kthread_create(ploop_pb_health_monitor, pbd, "ploop_pb_hm%d",
+ pbd->plo->index);
+ if (IS_ERR(ts))
+ return PTR_ERR(ts);
+
+ pbd->health_monitor_thread = ts;
+ wake_up_process(ts);
+ return 0;
}
void ploop_pb_fini(struct ploop_pushbackup_desc *pbd)
@@ -365,6 +421,11 @@ void ploop_pb_fini(struct ploop_pushbackup_desc *pbd)
if (!RB_EMPTY_ROOT(&pbd->reported_set.tree))
printk("ploop_pb_fini: reported_tree is not empty!\n");
+ if (pbd->health_monitor_thread) {
+ kthread_stop(pbd->health_monitor_thread);
+ pbd->health_monitor_thread = NULL;
+ }
+
if (pbd->plo) {
struct ploop_device *plo = pbd->plo;
mutex_lock(&plo->sysfs_mutex);
@@ -545,6 +606,11 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
spin_lock(&pbd->ppb_lock);
+ if (pbd->ppb_state != PLOOP_PB_ALIVE) {
+ spin_unlock(&pbd->ppb_lock);
+ return -ESTALE;
+ }
+
if (!test_bit(PLOOP_S_PUSH_BACKUP, &pbd->plo->state)) {
spin_unlock(&pbd->ppb_lock);
return -EINTR;
@@ -565,6 +631,7 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc *pbd,
return 0;
}
+/* Always serialized by plo->ctl_mutex */
unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
{
unsigned long ret = 0;
@@ -574,6 +641,14 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
if (pbd == NULL)
return 0;
+ spin_lock(&pbd->ppb_lock);
+ if (pbd->ppb_state == PLOOP_PB_DEAD) {
+ spin_unlock(&pbd->ppb_lock);
+ return 0;
+ }
+ pbd->ppb_state = PLOOP_PB_DEAD;
+ spin_unlock(&pbd->ppb_lock);
+
ploop_pbs_fini(&pbd->pending_set);
ploop_pbs_fini(&pbd->reported_set);
@@ -632,6 +707,10 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
}
/* blocking case */
+ if (pbd->ppb_state != PLOOP_PB_ALIVE) {
+ err = -ESTALE;
+ goto get_pending_unlock;
+ }
if (unlikely(pbd->ppb_waiting)) {
/* Other task is already waiting for event */
err = -EBUSY;
@@ -655,6 +734,8 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
if (!preq) {
if (!test_bit(PLOOP_S_PUSH_BACKUP, &plo->state))
err = -EINTR;
+ else if (pbd->ppb_state != PLOOP_PB_ALIVE)
+ err = -ESTALE;
else if (signal_pending(current))
err = -ERESTARTSYS;
else err = -ENOENT;
@@ -719,6 +800,9 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
if (block >= pbd->ppb_block_max)
return -ENOENT;
+ if (pbd->ppb_state != PLOOP_PB_ALIVE)
+ return -ESTALE;
+
page = alloc_page(GFP_KERNEL);
if (!page)
return -ENOMEM;
More information about the Devel
mailing list