[Devel] [PATCH VZ9 3/3] fs: fuse: pcs: implement dislog

Alexey Kuznetsov kuznet at virtuozzo.com
Tue Oct 1 21:44:36 MSK 2024


It is an event infrastructure required to fix multiple problems with
delayed detection of cluster problems. We closed the bugs, but actually
forgot to port required processing to kernel module.

Nothing fancy, it is straight port of userspace code with some kernel
technicalities. Note, as event log is flexible extensible subsystem
we will need to keep it uptodate, let's not forget it.

Signed-off-by: Alexey Kuznetsov <kuznet at virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_cs.c           | 217 ++++++++++++++++++++++++++++++++++---
 fs/fuse/kio/pcs/pcs_cs.h           |  19 ++++
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |  33 ++++++
 fs/fuse/kio/pcs/pcs_ioctl.h        |   3 +
 fs/fuse/kio/pcs/pcs_map.c          | 104 +++++++++++++++---
 fs/fuse/kio/pcs/pcs_map.h          |   4 +
 fs/fuse/kio/pcs/pcs_mds_prot.h     |  75 ++++++++++++-
 fs/fuse/kio/pcs/pcs_prot_types.h   |   5 +
 8 files changed, 424 insertions(+), 36 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 7bf6288..ed3de75 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -143,24 +143,24 @@ struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css)
 	return cs;
 }
 
-unsigned int pcs_cs_hash(PCS_NODE_ID_T *id)
+static inline unsigned int pcs_cs_hash(u64 id)
 {
-	return *(unsigned int *)id % PCS_CS_HASH_SIZE;
+	return id % PCS_CS_HASH_SIZE;
 }
 
 static struct pcs_cs *
-__lookup_cs(struct pcs_cs_set *csset, PCS_NODE_ID_T *id)
+__lookup_cs(struct pcs_cs_set *csset, u64 id)
 {
 	struct pcs_cs *cs;
 	hlist_for_each_entry_rcu(cs, &csset->ht[pcs_cs_hash(id)], hlist) {
-		if (memcmp(&cs->id, id, sizeof(cs->id)) == 0)
+		if (cs->id.val == id)
 			return cs;
 	}
 	return NULL;
 }
 
 static struct pcs_cs *
-lookup_and_lock_cs(struct pcs_cs_set *csset, PCS_NODE_ID_T *id)
+lookup_and_lock_cs(struct pcs_cs_set *csset, u64 id)
 {
 	struct pcs_cs *cs;
 retry:
@@ -181,7 +181,7 @@ unsigned int pcs_cs_hash(PCS_NODE_ID_T *id)
 
 static void add_cs(struct pcs_cs_set *csset, struct pcs_cs *cs)
 {
-	unsigned int hash = pcs_cs_hash(&cs->id);
+	unsigned int hash = pcs_cs_hash(cs->id.val);
 
 	assert_spin_locked(&csset->lock);
 
@@ -196,7 +196,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
 	struct pcs_cs *cs;
 
 again:
-	cs = lookup_and_lock_cs(csset, id);
+	cs = lookup_and_lock_cs(csset, id->val);
 	if (cs) {
 		/* If rpc is connected, leave it connected until failure.
 		 * After current connect fails, reconnect will be done to new address
@@ -206,7 +206,8 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
 			if (addr->type != PCS_ADDRTYPE_NONE) {
 				if (pcs_netaddr_cmp(&cs->addr, addr)) {
 					cs->addr = *addr;
-					cs->addr_serno++;
+					cs->addr_serno =
+					atomic_inc_return(&csset->cs_addr_change_sn);
 
 					FUSE_KTRACE(cc_from_csset(csset)->fc,
 						    "Port change CS" NODE_FMT " seq=%d",
@@ -250,10 +251,9 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
 
 	spin_lock(&cs->lock);
 	spin_lock(&csset->lock);
-	if (__lookup_cs(csset, id)) {
+	if (__lookup_cs(csset, id->val)) {
 		spin_unlock(&csset->lock);
 		cs->is_dead = 1;
-		spin_unlock(&cs->lock);
 		pcs_cs_destroy(cs);
 		goto again;
 	}
@@ -836,7 +836,7 @@ void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
 		who = cs;
 		spin_lock(&who->lock);
 	} else
-		who = lookup_and_lock_cs(cs->css, &h->xid.origin);
+		who = lookup_and_lock_cs(cs->css, h->xid.origin.val);
 
 	if (who && !who->cwr_state) {
 		/* Unless node is already reducing congestion window, shrink it
@@ -891,7 +891,7 @@ void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *ms
 	struct pcs_cs *who;
 
 	/* Some CS reported it cannot complete local IO in time, close congestion window */
-	who = lookup_and_lock_cs(cs->css, &h->xid.origin);
+	who = lookup_and_lock_cs(cs->css, h->xid.origin.val);
 	if (who) {
 		struct pcs_int_request *ireq = req->private2;
 		abs_time_t lat = 0; /* GCC bug */
@@ -960,7 +960,7 @@ void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err)
 		return;
 	}
 
-	cs = lookup_and_lock_cs(&cc->css, &err->offender);
+	cs = lookup_and_lock_cs(&cc->css, err->offender.val);
 	if (cs == NULL)
 		return;
 
@@ -1221,7 +1221,7 @@ static void cs_probe_done(struct pcs_msg *msg)
 	struct pcs_cs_set *css = msg->private;
 	struct pcs_cs *cs;
 
-	cs = lookup_and_lock_cs(css, &msg->rpc->peer_id);
+	cs = lookup_and_lock_cs(css, msg->rpc->peer_id.val);
 
 	if (cs) {
 		if (!pcs_if_error(&msg->error)) {
@@ -1320,6 +1320,8 @@ void pcs_csset_init(struct pcs_cs_set *css)
 
 	for (i = 0; i < PCS_CS_HASH_SIZE; i++)
 		INIT_HLIST_HEAD(&css->ht[i]);
+	for (i = 0; i < PCS_HOST_HASH_SIZE; i++)
+		INIT_HLIST_HEAD(&css->hht[i]);
 
 	INIT_LIST_HEAD(&css->lru);
 	INIT_LIST_HEAD(&css->bl_list);
@@ -1327,6 +1329,9 @@ void pcs_csset_init(struct pcs_cs_set *css)
 	css->ncs = 0;
 	spin_lock_init(&css->lock);
 	atomic64_set(&css->csl_serno_gen, 0);
+	atomic_set(&css->cs_addr_change_sn, 0);
+	atomic_set(&css->offline_host_sn, 0);
+	atomic_set(&css->ill_cs_sn, 0);
 }
 
 static void pcs_cs_wait_unused(struct pcs_cs *cs)
@@ -1341,6 +1346,96 @@ static void pcs_cs_wait_unused(struct pcs_cs *cs)
 	cs->use_count--;
 }
 
+static void host_destroy_rcu(struct rcu_head *head)
+{
+	kfree(container_of(head, struct pcs_host, rcu));
+}
+
+static inline unsigned int pcs_host_hash(u64 val)
+{
+	return jhash_2words(val, val>>32, 0) & (PCS_HOST_HASH_SIZE - 1);
+}
+
+static inline u64 pcs_dislog_obj_id(u64 node_id)
+{
+	struct pcs_mds_cached_event evt;
+
+	evt.tag = node_id;
+	BUILD_BUG_ON(sizeof(evt.node_id)+1 != sizeof(node_id));
+	evt.node_id[sizeof(evt.node_id)-1] ^= evt.flags; /* XOR with high order byte of node id */
+	evt.flags = 0;
+	return evt.tag;
+}
+
+int pcs_dislog_is_host_down(struct pcs_cs_set *css, PCS_NODE_ID_T host_id)
+{
+	u64 val = pcs_dislog_obj_id(host_id.val);
+	unsigned int hash = pcs_host_hash(val);
+	struct pcs_host *h;
+	int found = 0;
+
+	rcu_read_lock();
+	hlist_for_each_entry_rcu(h, css->hht + hash, link) {
+		if (h->host_id.val == val) {
+			found = 1;
+			break;
+		}
+	}
+	rcu_read_unlock();
+	return found;
+}
+
+void pcs_dislog_host_add(struct pcs_cs_set *css, u64 host_id)
+{
+	u64 val = pcs_dislog_obj_id(host_id);
+	unsigned int hash = pcs_host_hash(val);
+	struct pcs_host *h, *nh;
+	int complain = 0;
+
+	nh = kmalloc(sizeof(struct pcs_host), GFP_NOIO);
+
+	spin_lock(&css->lock);
+	hlist_for_each_entry(h, css->hht + hash, link) {
+		if (h->host_id.val == val)
+			break;
+	}
+	if (!h) {
+		if (nh) {
+			nh->host_id.val = val;
+			hlist_add_head_rcu(&nh->link, css->hht + hash);
+		} else
+			complain = 1;
+		nh = NULL;
+	}
+	spin_unlock(&css->lock);
+	if (h && nh)
+		kfree(nh);
+	if (!h)
+		FUSE_KTRACE(cc_from_csset(css)->fc,
+			    "Host#%lu is down%s",
+			    (unsigned long)val, complain ? ", failed to remember" : "");
+}
+
+void pcs_dislog_host_del(struct pcs_cs_set *css, u64 host_id)
+{
+	u64 val = pcs_dislog_obj_id(host_id);
+	unsigned int hash = pcs_host_hash(val);
+	struct pcs_host *h;
+
+	spin_lock(&css->lock);
+	hlist_for_each_entry(h, css->hht + hash, link) {
+		if (h->host_id.val == val)
+			break;
+	}
+	if (h)
+		hlist_del_rcu(&h->link);
+	spin_unlock(&css->lock);
+	if (h) {
+		FUSE_KTRACE(cc_from_csset(css)->fc, "Host#%lu is up", (unsigned long)val);
+		call_rcu(&h->rcu, host_destroy_rcu);
+	}
+}
+
 void pcs_csset_fini(struct pcs_cs_set *css)
 {
 	unsigned int i;
@@ -1373,6 +1468,17 @@ void pcs_csset_fini(struct pcs_cs_set *css)
 		spin_unlock(&css->lock);
 
 	}
+	for (i = 0; i < PCS_HOST_HASH_SIZE; i++) {
+		spin_lock(&css->lock);
+		while (!hlist_empty(&css->hht[i])) {
+			struct pcs_host *h = hlist_entry(css->hht[i].first, struct pcs_host, link);
+
+			hlist_del_rcu(&h->link);
+			call_rcu(&h->rcu, host_destroy_rcu);
+		}
+		spin_unlock(&css->lock);
+	}
+
 	cancel_delayed_work_sync(&css->bl_work);
 	/* NOTE: It looks like	must being empty at destruction */
 	BUG_ON(!list_empty(&to_resubmit));
@@ -1382,8 +1488,6 @@ void pcs_csset_fini(struct pcs_cs_set *css)
 	BUG_ON(!list_empty(&css->bl_list));
 	BUG_ON(!list_empty(&css->lru));
 	BUG_ON(css->ncs);
-
-
 }
 
 int pcs_cs_for_each_entry(struct pcs_cs_set *set, int (*cb)(struct pcs_cs *cs, void *arg), void *arg)
@@ -1436,3 +1540,84 @@ int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs)
 	spin_unlock(&cs->lock);
 	return queued;
 }
+
+static struct pcs_cs *
+__lookup_cs_dislog(struct pcs_cs_set *csset, u64 id)
+{
+	struct pcs_cs *cs;
+
+	hlist_for_each_entry_rcu(cs, &csset->ht[pcs_cs_hash(id)], hlist) {
+		if (pcs_dislog_obj_id(cs->id.val) == id)
+			return cs;
+	}
+	return NULL;
+}
+
+static struct pcs_cs *
+lookup_and_lock_cs_dislog(struct pcs_cs_set *csset, u64 id)
+{
+	struct pcs_cs *cs;
+retry:
+	rcu_read_lock();
+	cs = __lookup_cs_dislog(csset, id);
+	if (!cs) {
+		rcu_read_unlock();
+		return NULL;
+	}
+	spin_lock(&cs->lock);
+	rcu_read_unlock();
+	if (cs->is_dead) {
+		spin_unlock(&cs->lock);
+		goto retry;
+	}
+	return cs;
+}
+
+int pcs_cs_dislog_event(struct pcs_cluster_core *cc, struct pcs_mds_cached_event *evt)
+{
+	struct pcs_cs *cs;
+
+	cs = lookup_and_lock_cs_dislog(&cc->css, dislog_evt_obj_id(evt));
+	if (!cs)
+		return 0;
+
+	switch (evt->flags & PCS_MDS_EVT_F_TYPE_MASK) {
+	case PCS_MDS_EVT_F_ADDR_CHANGED:
+		cs->addr_serno = atomic_inc_return(&cc->css.cs_addr_change_sn);
+		FUSE_KTRACE(cc_from_csset(cs->css)->fc,
+			    "Dislog addr change CS" NODE_FMT " seq=%d",
+			    NODE_ARGS(cs->id), cs->addr_serno);
+		break;
+	case PCS_MDS_EVT_F_MAINTENANCE_IN:
+		FUSE_KTRACE(cc_from_csset(cs->css)->fc,
+			    "Dislog enter maintanance CS" NODE_FMT,
+			    NODE_ARGS(cs->id));
+		cs->mds_flags |= CS_FL_MAINTENANCE;
+		set_bit(CS_SF_MAINTENANCE, &cs->state);
+		break;
+	case PCS_MDS_EVT_F_MAINTENANCE_OUT:
+		cs->mds_flags &= ~CS_FL_MAINTENANCE;
+		clear_bit(CS_SF_MAINTENANCE, &cs->state);
+		FUSE_KTRACE(cc_from_csset(cs->css)->fc,
+			    "Dislog leave maintanance CS" NODE_FMT,
+			    NODE_ARGS(cs->id));
+		break;
+	case PCS_MDS_EVT_F_ILL_IN:
+		cs->mds_flags |= CS_FL_ILL;
+		set_bit(CS_SF_ILL, &cs->state);
+		atomic_inc(&cc->css.ill_cs_sn);
+		FUSE_KTRACE(cc_from_csset(cs->css)->fc,
+			    "Dislog enter ILL CS" NODE_FMT,
+			    NODE_ARGS(cs->id));
+		break;
+	case PCS_MDS_EVT_F_ILL_OUT:
+		cs->mds_flags &= ~CS_FL_ILL;
+		clear_bit(CS_SF_ILL, &cs->state);
+		FUSE_KTRACE(cc_from_csset(cs->css)->fc,
+			    "Dislog leave ILL CS" NODE_FMT,
+			    NODE_ARGS(cs->id));
+		break;
+	}
+	spin_unlock(&cs->lock);
+	return 0;
+}
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 62b88f6..0cb7d50 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -38,9 +38,11 @@
 enum {
 	CS_SF_LOCAL,
 	CS_SF_LOCAL_SOCK,
+	CS_SF_MAINTENANCE,
 	CS_SF_INACTIVE,
 	CS_SF_REPLICATING,
 	CS_SF_FAILED,
+	CS_SF_ILL,
 	CS_SF_BLACKLISTED,
 	CS_SF_ACTIVE,
 };
@@ -129,6 +131,7 @@ static inline void pcs_cs_activate_cong_queue(struct pcs_cs *cs)
 int pcs_cs_cong_enqueue_cond(struct pcs_int_request *ireq, struct pcs_cs *cs);
 
 #define PCS_CS_HASH_SIZE 1024
+#define PCS_HOST_HASH_SIZE 64
 
 struct pcs_cs_set {
 	struct hlist_head	ht[PCS_CS_HASH_SIZE];
@@ -138,6 +141,16 @@ struct pcs_cs_set {
 	unsigned int		ncs;
 	spinlock_t		lock;
 	atomic64_t		csl_serno_gen;
+	atomic_t		cs_addr_change_sn;
+	atomic_t		offline_host_sn;
+	atomic_t		ill_cs_sn;
+	struct hlist_head	hht[PCS_HOST_HASH_SIZE];
+};
+
+struct pcs_host {
+	struct hlist_node	link;
+	struct rcu_head		rcu;
+	PCS_NODE_ID_T		host_id;
 };
 
 void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq);
@@ -220,4 +233,10 @@ static inline bool cs_is_blacklisted(struct pcs_cs *cs)
 void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h);
 struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
 void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
+
+int pcs_cs_dislog_event(struct pcs_cluster_core *cc, struct pcs_mds_cached_event *evt);
+int pcs_dislog_is_host_down(struct pcs_cs_set *css, PCS_NODE_ID_T host_id);
+void pcs_dislog_host_add(struct pcs_cs_set *css, u64 host_id);
+void pcs_dislog_host_del(struct pcs_cs_set *css, u64 host_id);
+
 #endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index a0452bad..4e911eb 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -1773,6 +1773,26 @@ static void kpcs_kill_requests(struct fuse_conn *fc, struct inode *inode)
 	pcs_kio_file_list(fc, kpcs_kill_lreq_itr, inode);
 }
 
+static int pcs_process_dislog(struct pcs_cluster_core *cc, struct pcs_mds_cached_event *evt)
+{
+	switch (evt->flags & PCS_MDS_EVT_F_OBJ_MASK) {
+	case PCS_MDS_EVT_F_OBJ_CS:
+		return pcs_cs_dislog_event(cc, evt);
+	case PCS_MDS_EVT_F_OBJ_HOST:
+		switch (evt->flags & PCS_MDS_EVT_F_TYPE_MASK) {
+		case PCS_MDS_EVT_F_DOWN:
+			pcs_dislog_host_add(&cc->css, dislog_evt_obj_id(evt));
+			atomic_inc(&cc->css.offline_host_sn);
+			break;
+		case PCS_MDS_EVT_F_UP:
+			pcs_dislog_host_del(&cc->css, dislog_evt_obj_id(evt));
+			break;
+		}
+		return 0;
+	}
+	return -EINVAL;
+}
+
 static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd, unsigned long arg, int len)
 {
 	struct fuse_conn * fc = NULL;
@@ -1953,6 +1973,19 @@ static int kpcs_dev_ioctl(struct fuse_conn *fc, unsigned int cmd, unsigned long
 		res = pcs_dereg_mr(&cc->mrs, req.id);
 		break;
 	}
+	case PCS_IOC_DISLOG:
+	{
+		struct pcs_mds_cached_event ev;
+
+		if (len < sizeof(ev))
+			return -EINVAL;
+
+		if (copy_from_user(&ev, (void __user *)arg, sizeof(ev)))
+			return -EFAULT;
+
+		res = pcs_process_dislog(cc, &ev);
+		break;
+	}
 	default:
 		res = -ENOIOCTLCMD;
 		break;
diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
index a0795d3..fb22cf8 100644
--- a/fs/fuse/kio/pcs/pcs_ioctl.h
+++ b/fs/fuse/kio/pcs/pcs_ioctl.h
@@ -64,6 +64,7 @@ struct pcs_ioc_getmap
 #define PCS_IOC_MAP_S_WRITE	0x2
 #define PCS_IOC_MAP_S_NEW	0x4
 #define PCS_IOC_MAP_S_ERROR	0x8
+#define PCS_IOC_MAP_S_EXT	0x80000000
 	pcs_error_t		error;		/* in/out */
 	u16			mds_flags;	/* in/out */
 	u32			psize_ret;	/* length of chunk on CS (out) */
@@ -152,4 +153,6 @@ struct pcs_ioc_krpc_destroy {
 };
 #define PCS_IOC_KRPC_DESTROY _IOR('V', 45, struct pcs_ioc_krpc_destroy)
 
+#define PCS_IOC_DISLOG	_IO('V', 64)
+
 #endif /* _PCS_IOCTL_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_map.c b/fs/fuse/kio/pcs/pcs_map.c
index 4aba8e7..a40cc68 100644
--- a/fs/fuse/kio/pcs/pcs_map.c
+++ b/fs/fuse/kio/pcs/pcs_map.c
@@ -744,6 +744,16 @@ static inline void map_remote_error_nolock(struct pcs_map_entry *m , int error,
 	__map_error(m, 1 , error, offender);
 }
 
+static void pcs_map_mark_stale(struct pcs_map_entry *m, struct pcs_cs *cs)
+{
+	if (!(m->state & (PCS_MAP_ERROR|PCS_MAP_DEAD|PCS_MAP_RESOLVING|PCS_MAP_NEW))) {
+		FUSE_KTRACE(cc_from_csset(cs->css)->fc,
+			    MAP_FMT " invalidating due to state of CS#"NODE_FMT,
+			    MAP_ARGS(m), NODE_ARGS(cs->id));
+		map_remote_error_nolock(m, PCS_ERR_CSD_STALE_MAP, cs->id.val);
+	}
+}
+
 void pcs_map_notify_addr_change(struct pcs_cs * cs)
 {
 	struct pcs_cs_list *cs_list, *prev_cs_list = NULL;
@@ -779,18 +789,10 @@ void pcs_map_notify_addr_change(struct pcs_cs * cs)
 		prev_cs_list = cs_list;
 
 		spin_lock(&m->lock);
-		if ((m->state & PCS_MAP_DEAD) || m->cs_list != cs_list)
-			goto unlock;
-
-		if (m->state & (PCS_MAP_ERROR|PCS_MAP_RESOLVING|PCS_MAP_NEW))
-			goto unlock;
-
-		FUSE_KTRACE(cc_from_csset(cs->css)->fc, MAP_FMT " invalidating due to address change of CS#"NODE_FMT,
-		      MAP_ARGS(m), NODE_ARGS(cs->id));
-
-		map_remote_error_nolock(m, PCS_ERR_CSD_STALE_MAP, cs->id.val);
-unlock:
+		if (!(m->state & PCS_MAP_DEAD) && m->cs_list == cs_list)
+			pcs_map_mark_stale(m, cs);
 		spin_unlock(&m->lock);
+
 		spin_lock(&cs->lock);
 	}
 
@@ -981,7 +983,8 @@ int pcs_map_encode_req(struct pcs_map_entry*m, struct pcs_ioc_getmap *map, int d
  * Alloc and initialize cslist, grab cs->lock inside
  */
 struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *rec, int cs_cnt,
-				     int read_tout, int write_tout, int error_clear)
+				     int read_tout, int write_tout, int error_clear,
+				     struct pcs_mds_map_ext *mx)
 {
 	struct pcs_cs_list * cs_list = NULL;
 	struct pcs_cs * cs;
@@ -996,6 +999,9 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re
 	cs_list->read_index = -1;
 	cs_list->state_flags = 0;
 	cs_list->serno = atomic64_inc_return(&css->csl_serno_gen);
+	cs_list->cs_addr_change_sn = atomic_read(&css->cs_addr_change_sn);
+	cs_list->offline_host_sn = atomic_read(&css->offline_host_sn);
+	cs_list->ill_cs_sn = atomic_read(&css->ill_cs_sn);
 	cs_list->blacklist = 0;
 	cs_list->read_timeout = (read_tout * HZ) / 1000;
 	cs_list->write_timeout = (write_tout * HZ) / 1000;
@@ -1004,6 +1010,11 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re
 		cs_list->cs[i].info = rec[i];
 		memset(&cs_list->cs[i].sync, 0, sizeof(cs_list->cs[i].sync));
 		cs_list->cs[i].dirty_ts = jiffies;
+		if (mx && i < mx->cs_info_count)
+			cs_list->cs[i].xinfo =
+			((struct pcs_cs_info_ext *)((void *)mx + mx->cs_info_offset))[i];
+		else
+			memset(&cs_list->cs[i].xinfo, 0, sizeof(struct pcs_cs_info_ext));
 		RCU_INIT_POINTER(cs_list->cs[i].cslink.cs, NULL);
 		INIT_LIST_HEAD(&cs_list->cs[i].cslink.link);
 		cs_list->cs[i].cslink.index = i;
@@ -1065,6 +1076,16 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re
 		else if (test_bit(CS_SF_FAILED, &cs->state))
 			clear_bit(CS_SF_FAILED, &cs->state);
 
+		if (cs->mds_flags & CS_FL_ILL)
+			set_bit(CS_SF_ILL, &cs->state);
+		else if (test_bit(CS_SF_ILL, &cs->state))
+			clear_bit(CS_SF_ILL, &cs->state);
+
+		if (cs->mds_flags & CS_FL_MAINTENANCE)
+			set_bit(CS_SF_MAINTENANCE, &cs->state);
+		else if (test_bit(CS_SF_MAINTENANCE, &cs->state))
+			clear_bit(CS_SF_MAINTENANCE, &cs->state);
+
 		list_add(&cslink->link, &cs->map_list);
 		cs->nmaps++;
 		spin_unlock(&cs->lock);
@@ -1089,7 +1110,8 @@ struct pcs_cs_list* cslist_alloc( struct pcs_cs_set *css, struct pcs_cs_info *re
 void pcs_map_complete(struct pcs_map_entry *m, struct pcs_ioc_getmap *omap)
 {
 	pcs_error_t error = omap->error;
-	struct pcs_cs_list * cs_list = NULL;
+	struct pcs_cs_list *cs_list = NULL;
+	struct pcs_mds_map_ext *mx = NULL;
 	struct list_head queue;
 	int error_sensed = 0;
 
@@ -1109,11 +1131,15 @@ void pcs_map_complete(struct pcs_map_entry *m, struct pcs_ioc_getmap *omap)
 	}
 	TRACE("dentry: "DENTRY_FMT, DENTRY_ARGS(pcs_dentry_from_map(m)));
 
+	if (omap->state & PCS_IOC_MAP_S_EXT)
+		mx = (void *)&omap->cs[omap->cs_cnt];
+
 	error_sensed = m->state & PCS_MAP_ERROR;
 
 	if (omap->cs_cnt) {
 		spin_unlock(&m->lock);
-		cs_list = cslist_alloc(&cc_from_map(m)->css, omap->cs, omap->cs_cnt, omap->read_tout, omap->write_tout, error_sensed);
+		cs_list = cslist_alloc(&cc_from_map(m)->css, omap->cs, omap->cs_cnt,
+			  omap->read_tout, omap->write_tout, error_sensed, mx);
 		spin_lock(&m->lock);
 		if (!cs_list) {
 			pcs_set_local_error(&error, PCS_ERR_NOMEM);
@@ -1792,7 +1818,9 @@ static int select_cs_for_read(struct pcs_cluster_core *cc, struct pcs_cs_list *
 		unsigned int in_flight;
 		abs_time_t io_prio_stamp;
 
-		if (failed_cnt >= 0 && ((test_bit(CS_SF_FAILED, &cs->state)) || cs->id.val == banned_cs.val)) {
+		if (failed_cnt >= 0 &&
+		    ((test_bit(CS_SF_FAILED, &cs->state) || test_bit(CS_SF_ILL, &cs->state)) ||
+		     cs->id.val == banned_cs.val)) {
 			failed_cnt++;
 			continue;
 		}
@@ -2345,8 +2373,6 @@ static int pcs_cslist_submit_flush(struct pcs_int_request *ireq, struct pcs_cs_l
 	return 0;
 }
 
-
-
 int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl)
 {
 	BUG_ON(!atomic_read(&csl->refcnt));
@@ -2365,6 +2391,47 @@ int pcs_cslist_submit(struct pcs_int_request *ireq, struct pcs_cs_list *csl)
 	return -EIO;
 }
 
+static int map_chk_stale(struct pcs_map_entry *m)
+{
+	struct pcs_cluster_core *cc = cc_from_map(m);
+	struct pcs_cs_list *csl = m->cs_list;
+	int i, next_cs_addr_change_sn, next_offline_host_sn, next_ill_cs_sn;
+
+	if (!csl)
+		return 0;
+
+	next_cs_addr_change_sn = atomic_read(&cc->css.cs_addr_change_sn);
+	next_offline_host_sn = atomic_read(&cc->css.offline_host_sn);
+	next_ill_cs_sn = atomic_read(&cc->css.ill_cs_sn);
+
+	if (csl->cs_addr_change_sn == next_cs_addr_change_sn &&
+	    csl->offline_host_sn == next_offline_host_sn &&
+	    csl->ill_cs_sn == next_ill_cs_sn)
+		return 0;
+
+	for (i = 0; i < csl->nsrv; i++) {
+		if (csl->cs[i].cslink.cs->addr_serno != csl->cs[i].cslink.addr_serno) {
+			WARN_ON(csl->cs_addr_change_sn == next_cs_addr_change_sn);
+			pcs_map_mark_stale(m, csl->cs[i].cslink.cs);
+			return -1;
+		}
+		if (csl->offline_host_sn != next_offline_host_sn &&
+		    pcs_dislog_is_host_down(&cc->css, csl->cs[i].xinfo.host_id)) {
+			pcs_map_mark_stale(m, csl->cs[i].cslink.cs);
+			return -1;
+		}
+		if ((m->state & PCS_MAP_WRITEABLE) &&
+		    test_bit(CS_SF_ILL, &csl->cs[i].cslink.cs->state)) {
+			pcs_map_mark_stale(m, csl->cs[i].cslink.cs);
+			return -1;
+		}
+	}
+	csl->cs_addr_change_sn = next_cs_addr_change_sn;
+	csl->offline_host_sn = next_offline_host_sn;
+	csl->ill_cs_sn = next_ill_cs_sn;
+	return 0;
+}
+
 void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq)
 {
 	int direction;
@@ -2383,7 +2450,8 @@ void map_submit(struct pcs_map_entry * m, struct pcs_int_request *ireq)
 		if (ireq->type == PCS_IREQ_IOCHUNK && !(ireq->flags & IREQ_F_MAPPED))
 			ireq->iochunk.hbuf.map_version = m->version;
 
-		if (!(m->state & (1 << direction)) || m->state & PCS_MAP_DEAD) {
+		if (!(m->state & (1 << direction)) || (m->state & PCS_MAP_DEAD) ||
+		    map_chk_stale(m)) {
 			spin_unlock(&m->lock);
 			pcs_map_queue_resolve(m, ireq, direction);
 			return;
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index b8416712..e473a26 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -102,6 +102,7 @@ struct pcs_cs_record
 	struct cs_sync_state	sync;
 	abs_time_t		dirty_ts;
 	unsigned long		flags;
+	struct pcs_cs_info_ext	xinfo;
 	struct pcs_cs_link	cslink;
 };
 
@@ -123,6 +124,9 @@ struct pcs_cs_list
 #define CSL_SF_HAS_REPLICATING	1
 	/* members below are immutable accross cslist life time */
 	u64                     serno;
+	int			cs_addr_change_sn;
+	int			offline_host_sn;
+	int			ill_cs_sn;
 	int			read_timeout;
 	int			write_timeout;
 	int			nsrv;
diff --git a/fs/fuse/kio/pcs/pcs_mds_prot.h b/fs/fuse/kio/pcs/pcs_mds_prot.h
index f1cc23c..f723a70 100644
--- a/fs/fuse/kio/pcs/pcs_mds_prot.h
+++ b/fs/fuse/kio/pcs/pcs_mds_prot.h
@@ -71,9 +71,11 @@ struct pcs_mds_hdr
 enum {
 	CS_FL_LOCAL	  = 1,	  /* CS is on the same host as the client */
 	CS_FL_LOCAL_SOCK  = 2,	  /* CS listens on local socket */
+	CS_FL_MAINTENANCE = 8,	  /* CS in maintenance mode */
 	CS_FL_INACTIVE	  = 0x10, /* CS is not sending pings for some time */
 	CS_FL_REPLICATING = 0x20, /* This CS is replicating this map */
 	CS_FL_FAILED	  = 0x40, /* This CS has failed */
+	CS_FL_ILL	  = 0x80, /* This CS is marked as ill, client should avoid accessing it */
 	CS_FL_ROLE	  = 0xFF00,/* Role of this CS in raid array, 0..depth-1 are data chunks, the rest are syndrome */
 	CS_FL_ROLE_LOG	  = 8,
 };
@@ -97,7 +99,24 @@ struct pcs_cs_info {
 	u32			reserved;
 	/* Primary network address */
 	PCS_NET_ADDR_T		addr;
-} __attribute__((aligned(8)));
+} __aligned(8);
+
+struct pcs_cs_info_ext {
+	PCS_NODE_ID_T		host_id;        /* CS host id */
+	/* ... */                               /* may be extended in the future */
+} __aligned(8);
+
+struct pcs_mds_map_ext {
+	u32                     size;           /* total size of the structure */
+	u32                     flags;          /* flags */
+	u16                     reserved;
+	u16                     cs_info_offset; /* offset of the cs_info array in bytes */
+	u16                     cs_info_count;  /* the number of elements in cs_info array */
+	u16                     cs_info_size;   /* the size of the each cs_info element */
+	struct pcs_cached_epoch de_epoch;       /* the disruption event log epoch */
+	/* ... */                               /* may be extended in the future */
+	struct pcs_cs_info_ext  cs_info[0];     /* CS info array */
+} __aligned(8);
 
 struct pcs_cs_addr_info
 {
@@ -105,7 +124,59 @@ struct pcs_cs_addr_info
 	PCS_INTEGRITY_SEQ_T	integrity_seq;
 	u32			naddr;
 	PCS_NET_ADDR_T		addr[1];
-} __attribute__((aligned(8)));
+} __aligned(8);
+
+/* Object flags. They have two parts - object type and event type */
+enum {
+	/* Object type.
+	 * This is the type of the object which id is encoded in event's node_id field.
+	 */
+	PCS_MDS_EVT_F_OBJ_MASK  = 0xe0,
+	PCS_MDS_EVT_F_OBJ_CS    = 0x20,
+	PCS_MDS_EVT_F_OBJ_HOST  = 0x80,
+
+	/* Event type */
+	PCS_MDS_EVT_F_TYPE_MASK = 0x1f,
+
+	PCS_MDS_EVT_F_DOWN      = 1,
+	PCS_MDS_EVT_F_ADDR_CHANGED,
+
+	/* Events having this flag does not necessary require actions.
+	 * They may be ignored or used for cached information updating.
+	 */
+	PCS_MDS_EVT_F_TYPE_SOFT = 0x10,
+	PCS_MDS_EVT_F_UP = PCS_MDS_EVT_F_TYPE_SOFT,
+	PCS_MDS_EVT_F_MAINTENANCE_IN,
+	PCS_MDS_EVT_F_MAINTENANCE_OUT,
+	PCS_MDS_EVT_F_ILL_IN,
+	PCS_MDS_EVT_F_ILL_OUT,
+};
+
+/* Event log type. Currently we have only one but more types may be added in the future */
+enum {
+	PCS_MDS_EVT_LOG_DISRUPTION = 1
+	/* ... */
+};
+
+struct pcs_mds_cached_event {
+	union {
+		struct {
+			u8 node_id[7];
+			u8 flags; /* PCS_MDS_EVT_F_XXX */
+		};
+		u64 tag;
+	};
+} __aligned(8);
+
+
+static inline u64 dislog_evt_obj_id(struct pcs_mds_cached_event *evt)
+{
+	struct pcs_mds_cached_event e;
+
+	e.tag = evt->tag;
+	e.flags = 0;
+	return e.tag;
+}
 
 /* ---- connection request
  * The following structure serves as a payload for RPC connect messages to deliver MDS server list to the client.
diff --git a/fs/fuse/kio/pcs/pcs_prot_types.h b/fs/fuse/kio/pcs/pcs_prot_types.h
index def0073..64718d5 100644
--- a/fs/fuse/kio/pcs/pcs_prot_types.h
+++ b/fs/fuse/kio/pcs/pcs_prot_types.h
@@ -85,6 +85,11 @@ struct __pre_aligned(8) pcs_host_info {
 typedef u64 PCS_LEASE_GEN_T;
 typedef u32 PCS_POLICY_GEN_T;
 
+struct __pre_aligned(8) pcs_cached_epoch {
+	PCS_MASTER_GENID_T master;
+	u32 sn;
+} __aligned(8);
+
 typedef union {
 	struct {
 		u32 major;
-- 
1.8.3.1



More information about the Devel mailing list