[Devel] [PATCH VZ9] fs/fuse kio: fixed krpc abort, implement proper request msg cancellation
Alexey Kuznetsov
kuznet at virtuozzo.com
Thu Aug 1 16:15:37 MSK 2024
Ack
On Thu, Aug 1, 2024 at 12:12 AM Liu Kui <kui.liu at virtuozzo.com> wrote:
>
> When userspace issues krpc abort ioctl cmd, we need to make sure
> that all pending msgs must have been cancelled properly, that kernel
> can no longer access memory buffers from userspace, before we return
> to userspace. However if buffers are being used for IO, the msg
> cannot be cancelled, then we must wait until IO completes.
>
> Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
> ---
> fs/fuse/kio/pcs/pcs_krpc.c | 74 +++++++++++++++++++++++++++--------
> fs/fuse/kio/pcs/pcs_rpc.c | 46 ++++++++++++++++++++--
> fs/fuse/kio/pcs/pcs_rpc.h | 1 +
> fs/fuse/kio/pcs/pcs_sock_io.h | 3 +-
> 4 files changed, 103 insertions(+), 21 deletions(-)
>
> diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
> index e2391571561a..f62bf6ef11aa 100644
> --- a/fs/fuse/kio/pcs/pcs_krpc.c
> +++ b/fs/fuse/kio/pcs/pcs_krpc.c
> @@ -4,10 +4,10 @@
>
> #include <linux/types.h>
> #include <linux/list.h>
> -#include <linux/rbtree.h>
> #include <linux/refcount.h>
> #include <linux/file.h>
> #include <linux/anon_inodes.h>
> +#include <linux/delay.h>
>
> #include "pcs_types.h"
> #include "pcs_cluster.h"
> @@ -89,9 +89,9 @@ static void krpc_req_complete(struct krpc_req *kreq, int error)
> spin_lock(&krpc->lock);
> list_del(&kreq->link);
>
> - if (kreq->flags & KRPC_REQ_F_ABORTED) {
> - krpc_req_free(kreq);
> + if (unlikely(kreq->flags & KRPC_REQ_F_ABORTED)) {
> spin_unlock(&krpc->lock);
> + krpc_req_free(kreq);
> } else if (krpc_completion_post(krpc, comp)) {
> krpc_req_free(kreq);
> }
> @@ -166,6 +166,7 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
> resp->get_iter = krpc_msg_get_response_iter;
> resp->done = rpc_work_input;
> pcs_msg_del_calendar(msg);
> + msg->stage = PCS_MSG_STAGE_RECV;
>
> return resp;
> }
> @@ -464,11 +465,11 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se
>
> static int pcs_krpc_abort(struct pcs_krpc *krpc)
> {
> - struct list_head dispose_list;
> - struct krpc_req *kreq;
> + struct krpc_req *kreq, *tmp;
> struct krpc_completion *comp;
> -
> - INIT_LIST_HEAD(&dispose_list);
> + struct pcs_rpc *ep = krpc->rpc;
> + struct pcs_msg *msg;
> + int timeout = 1000; /* 10 ms */
>
> spin_lock(&krpc->lock);
>
> @@ -479,23 +480,64 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
>
> krpc->state = PCS_KRPC_STATE_ABORTED;
>
> + /* dispose all unprocessed completions */
> + while (!list_empty(&krpc->completion_queue)) {
> + comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link);
> + list_del(&comp->link);
> + krpc_completion_free(comp);
> + }
> + krpc->nr_completion = 0;
> +
> /* abort incompleted requests */
> list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue);
> - list_for_each_entry(kreq, &krpc->dispose_queue, link)
> + spin_unlock(&krpc->lock);
> +
> + /* nothing to be done */
> + if (list_empty(&krpc->dispose_queue))
> + return 0;
> +
> + /* abort incomplete requests */
> + mutex_lock(&ep->mutex);
> + list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) {
> kreq->flags |= KRPC_REQ_F_ABORTED;
> + msg = &kreq->msg;
> + /* if msg is cancelled, kreq will be removed from the queue */
> + pcs_rpc_cancel_msg(msg);
> + }
>
> - list_splice_tail_init(&krpc->completion_queue, &dispose_list);
> - krpc->nr_completion = 0;
> + /*
> + * The krpc->dispose_queue should be empty if there are no requests in
> + * busy state. Otherwise wait until all busy requests complete. This
> + * should be a extremely rare case, therefore sleep is acceptable here.
> + *
> + * We cannot keep references to busy requests while waiting, because
> + * busy requests could have been freed.
> + */
> + while (!list_empty(&krpc->dispose_queue)) {
> + kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, link);
> + msg = &kreq->msg;
> +
> + /* no longer busy and cancelled */
> + if (!pcs_rpc_cancel_msg(msg))
> + continue;
> +
> + /* seems somthing wrong happened to hardware, abort the rpc */
> + if (timeout == 0) {
> + rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
> + break;
> + }
> + mutex_unlock(&ep->mutex);
>
> - spin_unlock(&krpc->lock);
> + /* sleep 10 us */
> + udelay(10);
> + timeout--;
>
> - /* dispose all unprocessed completions */
> - while (!list_empty(&dispose_list)) {
> - comp = list_first_entry(&dispose_list, struct krpc_completion, link);
> - list_del(&comp->link);
> - krpc_completion_free(comp);
> + /* check again */
> + mutex_lock(&ep->mutex);
> }
>
> + mutex_unlock(&ep->mutex);
> +
> return 0;
> }
>
> diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
> index f990f0f30609..80bbffcb88a8 100644
> --- a/fs/fuse/kio/pcs/pcs_rpc.c
> +++ b/fs/fuse/kio/pcs/pcs_rpc.c
> @@ -561,6 +561,44 @@ void pcs_rpc_cancel_request(struct pcs_msg * msg)
> msg->done(msg);
> }
>
> +int pcs_rpc_cancel_msg(struct pcs_msg *msg)
> +{
> + struct pcs_rpc *ep = msg->rpc;
> +
> + BUG_ON(!ep);
> + BUG_ON(!mutex_is_locked(&ep->mutex));
> +
> + switch (msg->stage) {
> + case PCS_MSG_STAGE_SEND:
> + if (msg->netio->tops->cancel_msg(msg))
> + /*
> + * Request is under network IO right now, cannot be cancelled as its
> + * buffer could be in use.
> + */
> + return -EBUSY;
> + break;
> + case PCS_MSG_STAGE_RECV:
> + /*
> + * Response is under network IO right now, the request message cannot
> + * be cancelled as its buffer could be in use.
> + */
> + return -EBUSY;
> + default:
> + break;
> + }
> +
> + /* msg could be in ep->input_queue*/
> + spin_lock(&ep->q_lock);
> + list_del(&msg->list);
> + spin_unlock(&ep->q_lock);
> +
> + pcs_msg_del_calendar(msg);
> + msg->stage = PCS_MSG_STAGE_NONE;
> + msg->done(msg);
> +
> + return 0;
> +}
> +
> void rpc_work_input(struct pcs_msg * msg)
> {
> struct pcs_rpc * ep = msg->rpc;
> @@ -900,14 +938,14 @@ static void rpc_queue_work(struct work_struct *w)
> int repeat;
>
> again:
> - spin_lock(&ep->q_lock);
> - list_splice_tail_init(&ep->input_queue, &input_q);
> - spin_unlock(&ep->q_lock);
> -
> mutex_lock(&ep->mutex);
>
> TRACE("Handle queues\n");
>
> + spin_lock(&ep->q_lock);
> + list_splice_tail_init(&ep->input_queue, &input_q);
> + spin_unlock(&ep->q_lock);
> +
> /* Process messages which are already in the sock queue */
> if (ep->state == PCS_RPC_WORK) {
> struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
> diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
> index fe30bede7efe..9a651a812cf7 100644
> --- a/fs/fuse/kio/pcs/pcs_rpc.h
> +++ b/fs/fuse/kio/pcs/pcs_rpc.h
> @@ -324,5 +324,6 @@ static inline struct pcs_rpc *pcs_rpc_from_work(struct work_struct *wr)
> const char* pcs_rpc_state_name(unsigned state);
>
> void pcs_rpc_report_error(struct pcs_rpc *ep, unsigned int err);
> +int pcs_rpc_cancel_msg(struct pcs_msg *msg);
>
> #endif /* _PCS_RPC_H_ */
> diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
> index 274eebd94bfc..872faffefe01 100644
> --- a/fs/fuse/kio/pcs/pcs_sock_io.h
> +++ b/fs/fuse/kio/pcs/pcs_sock_io.h
> @@ -89,7 +89,8 @@ enum
> PCS_MSG_STAGE_SEND = 2, /* Message queued on socket queue */
> PCS_MSG_STAGE_SENT = 3, /* Message is sent */
> PCS_MSG_STAGE_WAIT = 4, /* Message is waiting for respnose */
> - PCS_MSG_STAGE_DONE = 5, /* Response received */
> + PCS_MSG_STAGE_RECV = 5, /* Response is being received */
> + PCS_MSG_STAGE_DONE = 6, /* Response received */
> };
>
> enum
> --
> 2.39.3 (Apple Git-146)
More information about the Devel
mailing list