[Devel] [PATCH VZ9] fs/fuse kio: fixed krpc abort, implement proper request msg cancellation

Liu Kui kui.liu at virtuozzo.com
Wed Jul 31 18:51:31 MSK 2024


When userspace issues krpc abort ioctl cmd, we need to make sure
that all pending msgs must have been cancelled properly, that kernel
can no longer access memory buffers from userspace, before we return
to userspace. However if buffers are being used for IO, the msg
cannot be cancelled, then we must wait until IO completes.

Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
---
 fs/fuse/kio/pcs/pcs_krpc.c    | 74 +++++++++++++++++++++++++++--------
 fs/fuse/kio/pcs/pcs_rpc.c     | 46 ++++++++++++++++++++--
 fs/fuse/kio/pcs/pcs_rpc.h     |  1 +
 fs/fuse/kio/pcs/pcs_sock_io.h |  3 +-
 4 files changed, 103 insertions(+), 21 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
index e2391571561a..f62bf6ef11aa 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.c
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -4,10 +4,10 @@
 
 #include <linux/types.h>
 #include <linux/list.h>
-#include <linux/rbtree.h>
 #include <linux/refcount.h>
 #include <linux/file.h>
 #include <linux/anon_inodes.h>
+#include <linux/delay.h>
 
 #include "pcs_types.h"
 #include "pcs_cluster.h"
@@ -89,9 +89,9 @@ static void krpc_req_complete(struct krpc_req *kreq, int error)
 	spin_lock(&krpc->lock);
 	list_del(&kreq->link);
 
-	if (kreq->flags & KRPC_REQ_F_ABORTED) {
-		krpc_req_free(kreq);
+	if (unlikely(kreq->flags & KRPC_REQ_F_ABORTED)) {
 		spin_unlock(&krpc->lock);
+		krpc_req_free(kreq);
 	} else if (krpc_completion_post(krpc, comp)) {
 		krpc_req_free(kreq);
 	}
@@ -166,6 +166,7 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
 	resp->get_iter = krpc_msg_get_response_iter;
 	resp->done = rpc_work_input;
 	pcs_msg_del_calendar(msg);
+	msg->stage = PCS_MSG_STAGE_RECV;
 
 	return resp;
 }
@@ -464,11 +465,11 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se
 
 static int pcs_krpc_abort(struct pcs_krpc *krpc)
 {
-	struct list_head dispose_list;
-	struct krpc_req *kreq;
+	struct krpc_req *kreq, *tmp;
 	struct krpc_completion *comp;
-
-	INIT_LIST_HEAD(&dispose_list);
+	struct pcs_rpc *ep = krpc->rpc;
+	struct pcs_msg *msg;
+	int timeout = 1000;	/* 10 ms */
 
 	spin_lock(&krpc->lock);
 
@@ -479,23 +480,64 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
 
 	krpc->state = PCS_KRPC_STATE_ABORTED;
 
+	/* dispose all unprocessed completions */
+	while (!list_empty(&krpc->completion_queue)) {
+		comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link);
+		list_del(&comp->link);
+		krpc_completion_free(comp);
+	}
+	krpc->nr_completion = 0;
+
 	/* abort incompleted requests */
 	list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue);
-	list_for_each_entry(kreq, &krpc->dispose_queue, link)
+	spin_unlock(&krpc->lock);
+
+	/* nothing to be done */
+	if (list_empty(&krpc->dispose_queue))
+		return 0;
+
+	/* abort incomplete requests */
+	mutex_lock(&ep->mutex);
+	list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) {
 		kreq->flags |= KRPC_REQ_F_ABORTED;
+		msg = &kreq->msg;
+		/* if msg is cancelled, kreq will be removed from the queue */
+		pcs_rpc_cancel_msg(msg);
+	}
 
-	list_splice_tail_init(&krpc->completion_queue, &dispose_list);
-	krpc->nr_completion = 0;
+	/*
+	 * The krpc->dispose_queue should be empty if there are no requests in
+	 * busy state. Otherwise wait until all busy requests complete. This
+	 * should be a extremely rare case, therefore sleep is acceptable here.
+	 *
+	 * We cannot keep references to busy requests while waiting, because
+	 * busy requests could have been freed.
+	 */
+	while (!list_empty(&krpc->dispose_queue)) {
+		kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, link);
+		msg = &kreq->msg;
+
+		/* no longer busy and cancelled */
+		if (!pcs_rpc_cancel_msg(msg))
+			continue;
+
+		/* seems somthing wrong happened to hardware, abort the rpc */
+		if (timeout == 0) {
+			rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
+			break;
+		}
+		mutex_unlock(&ep->mutex);
 
-	spin_unlock(&krpc->lock);
+		/* sleep 10 us */
+		udelay(10);
+		timeout--;
 
-	/* dispose all unprocessed completions */
-	while (!list_empty(&dispose_list)) {
-		comp = list_first_entry(&dispose_list, struct krpc_completion, link);
-		list_del(&comp->link);
-		krpc_completion_free(comp);
+		/* check again */
+		mutex_lock(&ep->mutex);
 	}
 
+	mutex_unlock(&ep->mutex);
+
 	return 0;
 }
 
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index f990f0f30609..80bbffcb88a8 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -561,6 +561,44 @@ void pcs_rpc_cancel_request(struct pcs_msg * msg)
 	msg->done(msg);
 }
 
+int pcs_rpc_cancel_msg(struct pcs_msg *msg)
+{
+	struct pcs_rpc *ep = msg->rpc;
+
+	BUG_ON(!ep);
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	switch (msg->stage) {
+	case PCS_MSG_STAGE_SEND:
+		if (msg->netio->tops->cancel_msg(msg))
+			/*
+			 * Request is under network IO right now, cannot be cancelled as its
+			 * buffer could be in use.
+			 */
+			return -EBUSY;
+		break;
+	case PCS_MSG_STAGE_RECV:
+		/*
+		 * Response is under network IO right now, the request message cannot
+		 * be cancelled as its buffer could be in use.
+		 */
+		return -EBUSY;
+	default:
+		break;
+	}
+
+	/* msg could be in ep->input_queue*/
+	spin_lock(&ep->q_lock);
+	list_del(&msg->list);
+	spin_unlock(&ep->q_lock);
+
+	pcs_msg_del_calendar(msg);
+	msg->stage = PCS_MSG_STAGE_NONE;
+	msg->done(msg);
+
+	return 0;
+}
+
 void rpc_work_input(struct pcs_msg * msg)
 {
 	struct pcs_rpc * ep = msg->rpc;
@@ -900,14 +938,14 @@ static void rpc_queue_work(struct work_struct *w)
 	int repeat;
 
 again:
-	spin_lock(&ep->q_lock);
-	list_splice_tail_init(&ep->input_queue, &input_q);
-	spin_unlock(&ep->q_lock);
-
 	mutex_lock(&ep->mutex);
 
 	TRACE("Handle queues\n");
 
+	spin_lock(&ep->q_lock);
+	list_splice_tail_init(&ep->input_queue, &input_q);
+	spin_unlock(&ep->q_lock);
+
 	/* Process messages which are already in the sock queue */
 	if (ep->state == PCS_RPC_WORK) {
 		struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
index fe30bede7efe..9a651a812cf7 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.h
+++ b/fs/fuse/kio/pcs/pcs_rpc.h
@@ -324,5 +324,6 @@ static inline struct pcs_rpc *pcs_rpc_from_work(struct work_struct *wr)
 const char* pcs_rpc_state_name(unsigned state);
 
 void pcs_rpc_report_error(struct pcs_rpc *ep, unsigned int err);
+int pcs_rpc_cancel_msg(struct pcs_msg *msg);
 
 #endif /* _PCS_RPC_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
index 274eebd94bfc..872faffefe01 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.h
+++ b/fs/fuse/kio/pcs/pcs_sock_io.h
@@ -89,7 +89,8 @@ enum
 	PCS_MSG_STAGE_SEND	= 2,	/* Message queued on socket queue */
 	PCS_MSG_STAGE_SENT	= 3,	/* Message is sent */
 	PCS_MSG_STAGE_WAIT	= 4,	/* Message is waiting for respnose */
-	PCS_MSG_STAGE_DONE	= 5,	/* Response received */
+	PCS_MSG_STAGE_RECV	= 5,	/* Response is being received */
+	PCS_MSG_STAGE_DONE	= 6,	/* Response received */
 };
 
 enum
-- 
2.39.3 (Apple Git-146)



More information about the Devel mailing list