[Devel] [PATCH VZ9] fs/fuse kio: fix krpc abort

Liu Kui kui.liu at virtuozzo.com
Thu Apr 17 07:57:23 MSK 2025


It's never a good idea of having to take the mutex lock of kernel rpc
in the path of ioctl from userspace, because it could block the calling
thread for very long time, especially when kernel rpc is trying to
establish a connection.

Currently it needs to take the mutex lock in krpc abort because it
wants to clean up all aborting msg from kernel rpc which is not easy
to do without taking the mutex lock. So this approach is perhaps
unfixable.

So this patch tries a different approach for krpc abort. Two flag
bits PCS_MSG_BUSY and PCS_MSG_ABORT are introduced. Instead of
removing aborting msgs from kernel rpc in krp abort, it now only
set the PCS_MSG_ABORT flag bit to aborting msg, preventing netio
from accessing associated user buffers. Meanwhile it needs to wait
if the PCS_MSG_BUSY flag bit is set, meaning the aborting msg is
currently under I/O by netio. It's very unlikely that the PCS_MSG_BUSY
flag bit stays set for long time, thus unlikely blocking krpc
abort for long time.

Fixes: #VSTOR-104248
https://virtuozzo.atlassian.net/browse/VSTOR-104248

Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_cs.c      |  1 +
 fs/fuse/kio/pcs/pcs_krpc.c    | 66 ++++++++++++-----------------------
 fs/fuse/kio/pcs/pcs_rdma_io.c | 14 ++++++++
 fs/fuse/kio/pcs/pcs_rpc.c     |  2 ++
 fs/fuse/kio/pcs/pcs_sock_io.c | 35 ++++++++++++++++++-
 fs/fuse/kio/pcs/pcs_sock_io.h | 14 ++++++++
 6 files changed, 88 insertions(+), 44 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index ad398acb03ef..299ece862cd3 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -715,6 +715,7 @@ static void do_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
 	pcs_clear_error(&msg->error);
 	msg->done = cs_sent;
 	msg->get_iter = aligned_msg ? cs_get_data_aligned : cs_get_data;
+	msg->flags = 0;
 
 	if ((map->state & PCS_MAP_DEAD) || (map->cs_list != csl)) {
 		ireq->error.value = PCS_ERR_CSD_STALE_MAP;
diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
index 087d1d966e0d..3e404a2d0bde 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.c
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -155,6 +155,20 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
 
 	kreq = msg->private2;
 
+	set_bit(PCS_MSG_BUSY, &msg->flags);
+
+	/* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */
+	smp_mb();
+
+	if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) {
+		pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST);
+		pcs_msg_del_calendar(msg);
+		list_del(&msg->list);
+		msg->stage = PCS_MSG_STAGE_DONE;
+		msg->done(msg);
+		return NULL;
+	}
+
 	resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr));
 	if (!resp)
 		return NULL;
@@ -256,6 +270,8 @@ void pcs_krpc_response_done(struct pcs_msg *msg)
 {
 	struct krpc_req *kreq = msg->private2;
 
+	clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
+
 	if (msg->rpc) {
 		pcs_rpc_put(msg->rpc);
 		msg->rpc = NULL;
@@ -579,6 +595,7 @@ static int kreq_make_sendmsg(struct krpc_req *kreq)
 	msg->rpc = NULL;
 	msg->done = pcs_krpc_msg_sent;
 	msg->get_iter = krpc_msg_get_data;
+	msg->flags = 0;
 
 	spin_lock(&krpc->lock);
 	if (krpc->state != PCS_KRPC_STATE_CONNECTED ||
@@ -711,9 +728,7 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
 {
 	struct krpc_req *kreq, *tmp;
 	struct krpc_completion *comp;
-	struct pcs_rpc *ep = krpc->rpc;
 	struct pcs_msg *msg;
-	int timeout = 1000;	/* 10 ms */
 
 	spin_lock(&krpc->lock);
 
@@ -744,52 +759,16 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
 	list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue);
 	spin_unlock(&krpc->lock);
 
-	/* nothing to be done */
-	if (list_empty(&krpc->dispose_queue))
-		return 0;
-
-	/* abort incomplete requests */
-	mutex_lock(&ep->mutex);
 	list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) {
 		kreq->flags |= KRPC_REQ_F_ABORTED;
 		msg = &kreq->msg;
-		/* if msg is cancelled, kreq will be removed from the queue */
-		pcs_rpc_cancel_msg(ep, msg);
+		/*
+		 * The msg isn't freed immediately here however the user buffer
+		 * won't be accessed.
+		 */
+		pcs_msg_abort(msg, true);
 	}
 
-	/*
-	 * The krpc->dispose_queue should be empty if there are no requests in
-	 * busy state. Otherwise wait until all busy requests complete. This
-	 * should be a extremely rare case, therefore sleep is acceptable here.
-	 *
-	 * We cannot keep references to busy requests while waiting, because
-	 * busy requests could have been freed.
-	 */
-	while (!list_empty(&krpc->dispose_queue)) {
-		kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, link);
-		msg = &kreq->msg;
-
-		/* no longer busy and cancelled */
-		if (!pcs_rpc_cancel_msg(ep, msg))
-			continue;
-
-		/* seems somthing wrong happened to hardware, abort the rpc */
-		if (timeout == 0) {
-			rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
-			break;
-		}
-		mutex_unlock(&ep->mutex);
-
-		/* sleep 10 us */
-		udelay(10);
-		timeout--;
-
-		/* check again */
-		mutex_lock(&ep->mutex);
-	}
-
-	mutex_unlock(&ep->mutex);
-
 	return 0;
 }
 
@@ -1117,6 +1096,7 @@ int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
 	msg = &connect_req->msg;
 	msg->size = 0;
 	msg->timeout = 0;
+	msg->flags = 0;
 	msg->rpc = NULL;
 	msg->done = krpc_connect_done;
 	pcs_clear_error(&msg->error);
diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c
index d50f2c1e97e3..1d9e648d2636 100644
--- a/fs/fuse/kio/pcs/pcs_rdma_io.c
+++ b/fs/fuse/kio/pcs/pcs_rdma_io.c
@@ -154,6 +154,7 @@ static struct pcs_msg *rio_dequeue_reserved_msg(struct pcs_rdmaio *rio)
 static void rio_msg_sent(struct pcs_rdmaio *rio, struct rio_tx *tx, struct pcs_msg *msg, int done)
 {
 	if (done) {
+		clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
 		pcs_msg_sent(msg);
 		msg->done(msg);
 	} else {
@@ -621,6 +622,19 @@ static int rio_submit(struct pcs_rdmaio *rio, struct pcs_msg *msg, int type, u64
 	int offset = 0;
 	struct iov_iter it;
 
+	if (msg) {
+		set_bit(PCS_MSG_BUSY, &msg->flags);
+
+		/* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */
+		smp_mb();
+
+		if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) {
+			pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST);
+			rio_msg_sent(rio, tx, msg, 1);
+			return 0;
+		}
+	}
+
 	tx = RE_NULL(rio_get_tx(dev));
 	if (!tx) {
 		if (allow_again)
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index 71c2a3b54da7..f15d0c3fb7cd 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -1374,6 +1374,7 @@ void pcs_rpc_init_input_msg(struct pcs_rpc * ep, struct pcs_msg * msg, int accou
 	INIT_HLIST_NODE(&msg->kill_link);
 	pcs_rpc_account_msg(ep, msg, account);
 	msg->destructor = pcs_rpc_input_destructor;
+	msg->flags = 0;
 }
 
 struct pcs_msg * pcs_rpc_alloc_input_msg(struct pcs_rpc * ep, int datalen)
@@ -1408,6 +1409,7 @@ void pcs_rpc_init_output_msg(struct pcs_msg * msg)
 	msg->rpc = NULL;
 	INIT_HLIST_NODE(&msg->kill_link);
 	msg->destructor = pcs_msg_output_destructor;
+	msg->flags = 0;
 }
 
 struct pcs_msg * pcs_rpc_alloc_output_msg(int datalen)
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
index 805b8f1e56b0..7eb231b7260d 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.c
+++ b/fs/fuse/kio/pcs/pcs_sock_io.c
@@ -35,6 +35,23 @@ void pcs_msg_sent(struct pcs_msg * msg)
 	}
 }
 
+int pcs_msg_abort(struct pcs_msg *msg, bool wait)
+{
+	set_bit(PCS_MSG_ABORTED, &msg->flags);
+
+	/* pair with the smp_mb() between set PCS_MSG_BUSY and test PCS_MSG_ABORTED */
+	smp_mb();
+
+	while (test_bit(PCS_MSG_BUSY, &msg->flags)) {
+		if (wait)
+			wait_on_bit(&msg->flags, PCS_MSG_BUSY, TASK_INTERRUPTIBLE);
+		else
+			return -EBUSY;
+	}
+
+	return 0;
+}
+
 static void sio_push(struct pcs_sockio * sio)
 {
 	TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->netio.parent));
@@ -376,6 +393,17 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
 			return;
 		}
 
+		set_bit(PCS_MSG_BUSY, &msg->flags);
+
+		/* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */
+		smp_mb();
+
+		/* Shouldn't abort a half sent message  */
+		if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags)) && !sio->write_offset) {
+			pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST);
+			goto skip_send;
+		}
+
 		/* TODO: cond resched here? */
 		while (sio->write_offset < msg->size) {
 			size_t left = msg->size - sio->write_offset;
@@ -409,6 +437,10 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
 				return;
 			}
 		}
+
+skip_send:
+		clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
+
 		list_del_init(&msg->list);
 		sio->write_queue_len -= msg->size;
 
@@ -634,7 +666,6 @@ struct pcs_msg * pcs_alloc_input_msg(struct pcs_sockio * sio, int datalen)
 
 	msg = kmalloc(sizeof(struct pcs_msg) + datalen, GFP_NOIO);
 	if (msg) {
-
 		pcs_msg_io_init(msg);
 		pcs_account_msg(sio, msg);
 		msg->destructor = pcs_msg_input_destructor;
@@ -713,6 +744,7 @@ struct pcs_msg * pcs_clone_msg(struct pcs_msg * msg)
 		clone->destructor = pcs_io_msg_output_destructor;
 		clone->private = msg;
 		clone->get_iter = get_iter_clone;
+		clone->flags = 0;
 	}
 	return clone;
 }
@@ -752,6 +784,7 @@ struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int copy_len)
 		clone->_inline_len = (short)copy_len;
 		memcpy(clone->_inline_buffer, msg_inline_head(msg), copy_len);
 		clone->get_iter = get_iter_cow_clone;
+		clone->flags = 0;
 	}
 	return clone;
 }
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
index 09870b38cdad..b14e992a22d3 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.h
+++ b/fs/fuse/kio/pcs/pcs_sock_io.h
@@ -35,6 +35,17 @@ struct pcs_api_channel
 	unsigned	msg_count;
 };
 
+/**
+ * pcs_msg flags
+ *
+ * PCS_MSG_BUSY:		set when the msg is under IO
+ * PCS_MSG_BORTED:  the msg was aborted.
+ */
+enum pcs_msg_flag {
+	PCS_MSG_BUSY,
+	PCS_MSG_ABORTED,
+};
+
 __pre_packed struct pcs_msg
 {
 	struct __pre_aligned(16) {
@@ -58,6 +69,8 @@ __pre_packed struct pcs_msg
 		unsigned char	stage;
 		abs_time_t	io_start_time;
 
+		unsigned long flags;
+
 		struct hlist_node	kill_link;
 
 		void		(*get_iter)(struct pcs_msg *, int offset, struct iov_iter *it,
@@ -181,6 +194,7 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, struct kvec *vec)
 }
 void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn);
 void pcs_msg_sent(struct pcs_msg * msg);
+int pcs_msg_abort(struct pcs_msg *msg, bool wait);
 
 static inline void * msg_inline_head(struct pcs_msg * msg)
 {
-- 
2.39.5 (Apple Git-154)



More information about the Devel mailing list