[Devel] [PATCH VZ9] fs/fuse kio: fixed krpc abort, implement proper request msg cancellation

Alexey Kuznetsov kuznet at virtuozzo.com
Thu Aug 1 16:15:37 MSK 2024


Ack

On Thu, Aug 1, 2024 at 12:12 AM Liu Kui <kui.liu at virtuozzo.com> wrote:
>
> When userspace issues krpc abort ioctl cmd, we need to make sure
> that all pending msgs must have been cancelled properly, that kernel
> can no longer access memory buffers from userspace, before we return
> to userspace. However if buffers are being used for IO, the msg
> cannot be cancelled, then we must wait until IO completes.
>
> Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
> ---
>  fs/fuse/kio/pcs/pcs_krpc.c    | 74 +++++++++++++++++++++++++++--------
>  fs/fuse/kio/pcs/pcs_rpc.c     | 46 ++++++++++++++++++++--
>  fs/fuse/kio/pcs/pcs_rpc.h     |  1 +
>  fs/fuse/kio/pcs/pcs_sock_io.h |  3 +-
>  4 files changed, 103 insertions(+), 21 deletions(-)
>
> diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
> index e2391571561a..f62bf6ef11aa 100644
> --- a/fs/fuse/kio/pcs/pcs_krpc.c
> +++ b/fs/fuse/kio/pcs/pcs_krpc.c
> @@ -4,10 +4,10 @@
>
>  #include <linux/types.h>
>  #include <linux/list.h>
> -#include <linux/rbtree.h>
>  #include <linux/refcount.h>
>  #include <linux/file.h>
>  #include <linux/anon_inodes.h>
> +#include <linux/delay.h>
>
>  #include "pcs_types.h"
>  #include "pcs_cluster.h"
> @@ -89,9 +89,9 @@ static void krpc_req_complete(struct krpc_req *kreq, int error)
>         spin_lock(&krpc->lock);
>         list_del(&kreq->link);
>
> -       if (kreq->flags & KRPC_REQ_F_ABORTED) {
> -               krpc_req_free(kreq);
> +       if (unlikely(kreq->flags & KRPC_REQ_F_ABORTED)) {
>                 spin_unlock(&krpc->lock);
> +               krpc_req_free(kreq);
>         } else if (krpc_completion_post(krpc, comp)) {
>                 krpc_req_free(kreq);
>         }
> @@ -166,6 +166,7 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
>         resp->get_iter = krpc_msg_get_response_iter;
>         resp->done = rpc_work_input;
>         pcs_msg_del_calendar(msg);
> +       msg->stage = PCS_MSG_STAGE_RECV;
>
>         return resp;
>  }
> @@ -464,11 +465,11 @@ static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_se
>
>  static int pcs_krpc_abort(struct pcs_krpc *krpc)
>  {
> -       struct list_head dispose_list;
> -       struct krpc_req *kreq;
> +       struct krpc_req *kreq, *tmp;
>         struct krpc_completion *comp;
> -
> -       INIT_LIST_HEAD(&dispose_list);
> +       struct pcs_rpc *ep = krpc->rpc;
> +       struct pcs_msg *msg;
> +       int timeout = 1000;     /* 10 ms */
>
>         spin_lock(&krpc->lock);
>
> @@ -479,23 +480,64 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
>
>         krpc->state = PCS_KRPC_STATE_ABORTED;
>
> +       /* dispose all unprocessed completions */
> +       while (!list_empty(&krpc->completion_queue)) {
> +               comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link);
> +               list_del(&comp->link);
> +               krpc_completion_free(comp);
> +       }
> +       krpc->nr_completion = 0;
> +
>         /* abort incompleted requests */
>         list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue);
> -       list_for_each_entry(kreq, &krpc->dispose_queue, link)
> +       spin_unlock(&krpc->lock);
> +
> +       /* nothing to be done */
> +       if (list_empty(&krpc->dispose_queue))
> +               return 0;
> +
> +       /* abort incomplete requests */
> +       mutex_lock(&ep->mutex);
> +       list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) {
>                 kreq->flags |= KRPC_REQ_F_ABORTED;
> +               msg = &kreq->msg;
> +               /* if msg is cancelled, kreq will be removed from the queue */
> +               pcs_rpc_cancel_msg(msg);
> +       }
>
> -       list_splice_tail_init(&krpc->completion_queue, &dispose_list);
> -       krpc->nr_completion = 0;
> +       /*
> +        * The krpc->dispose_queue should be empty if there are no requests in
> +        * busy state. Otherwise wait until all busy requests complete. This
> +        * should be a extremely rare case, therefore sleep is acceptable here.
> +        *
> +        * We cannot keep references to busy requests while waiting, because
> +        * busy requests could have been freed.
> +        */
> +       while (!list_empty(&krpc->dispose_queue)) {
> +               kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, link);
> +               msg = &kreq->msg;
> +
> +               /* no longer busy and cancelled */
> +               if (!pcs_rpc_cancel_msg(msg))
> +                       continue;
> +
> +               /* seems somthing wrong happened to hardware, abort the rpc */
> +               if (timeout == 0) {
> +                       rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
> +                       break;
> +               }
> +               mutex_unlock(&ep->mutex);
>
> -       spin_unlock(&krpc->lock);
> +               /* sleep 10 us */
> +               udelay(10);
> +               timeout--;
>
> -       /* dispose all unprocessed completions */
> -       while (!list_empty(&dispose_list)) {
> -               comp = list_first_entry(&dispose_list, struct krpc_completion, link);
> -               list_del(&comp->link);
> -               krpc_completion_free(comp);
> +               /* check again */
> +               mutex_lock(&ep->mutex);
>         }
>
> +       mutex_unlock(&ep->mutex);
> +
>         return 0;
>  }
>
> diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
> index f990f0f30609..80bbffcb88a8 100644
> --- a/fs/fuse/kio/pcs/pcs_rpc.c
> +++ b/fs/fuse/kio/pcs/pcs_rpc.c
> @@ -561,6 +561,44 @@ void pcs_rpc_cancel_request(struct pcs_msg * msg)
>         msg->done(msg);
>  }
>
> +int pcs_rpc_cancel_msg(struct pcs_msg *msg)
> +{
> +       struct pcs_rpc *ep = msg->rpc;
> +
> +       BUG_ON(!ep);
> +       BUG_ON(!mutex_is_locked(&ep->mutex));
> +
> +       switch (msg->stage) {
> +       case PCS_MSG_STAGE_SEND:
> +               if (msg->netio->tops->cancel_msg(msg))
> +                       /*
> +                        * Request is under network IO right now, cannot be cancelled as its
> +                        * buffer could be in use.
> +                        */
> +                       return -EBUSY;
> +               break;
> +       case PCS_MSG_STAGE_RECV:
> +               /*
> +                * Response is under network IO right now, the request message cannot
> +                * be cancelled as its buffer could be in use.
> +                */
> +               return -EBUSY;
> +       default:
> +               break;
> +       }
> +
> +       /* msg could be in ep->input_queue*/
> +       spin_lock(&ep->q_lock);
> +       list_del(&msg->list);
> +       spin_unlock(&ep->q_lock);
> +
> +       pcs_msg_del_calendar(msg);
> +       msg->stage = PCS_MSG_STAGE_NONE;
> +       msg->done(msg);
> +
> +       return 0;
> +}
> +
>  void rpc_work_input(struct pcs_msg * msg)
>  {
>         struct pcs_rpc * ep = msg->rpc;
> @@ -900,14 +938,14 @@ static void rpc_queue_work(struct work_struct *w)
>         int repeat;
>
>  again:
> -       spin_lock(&ep->q_lock);
> -       list_splice_tail_init(&ep->input_queue, &input_q);
> -       spin_unlock(&ep->q_lock);
> -
>         mutex_lock(&ep->mutex);
>
>         TRACE("Handle queues\n");
>
> +       spin_lock(&ep->q_lock);
> +       list_splice_tail_init(&ep->input_queue, &input_q);
> +       spin_unlock(&ep->q_lock);
> +
>         /* Process messages which are already in the sock queue */
>         if (ep->state == PCS_RPC_WORK) {
>                 struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
> diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
> index fe30bede7efe..9a651a812cf7 100644
> --- a/fs/fuse/kio/pcs/pcs_rpc.h
> +++ b/fs/fuse/kio/pcs/pcs_rpc.h
> @@ -324,5 +324,6 @@ static inline struct pcs_rpc *pcs_rpc_from_work(struct work_struct *wr)
>  const char* pcs_rpc_state_name(unsigned state);
>
>  void pcs_rpc_report_error(struct pcs_rpc *ep, unsigned int err);
> +int pcs_rpc_cancel_msg(struct pcs_msg *msg);
>
>  #endif /* _PCS_RPC_H_ */
> diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
> index 274eebd94bfc..872faffefe01 100644
> --- a/fs/fuse/kio/pcs/pcs_sock_io.h
> +++ b/fs/fuse/kio/pcs/pcs_sock_io.h
> @@ -89,7 +89,8 @@ enum
>         PCS_MSG_STAGE_SEND      = 2,    /* Message queued on socket queue */
>         PCS_MSG_STAGE_SENT      = 3,    /* Message is sent */
>         PCS_MSG_STAGE_WAIT      = 4,    /* Message is waiting for respnose */
> -       PCS_MSG_STAGE_DONE      = 5,    /* Response received */
> +       PCS_MSG_STAGE_RECV      = 5,    /* Response is being received */
> +       PCS_MSG_STAGE_DONE      = 6,    /* Response received */
>  };
>
>  enum
> --
> 2.39.3 (Apple Git-146)



More information about the Devel mailing list