[Devel] [PATCH VZ9] fs/fuse kio: fix krpc abort
Alexey Kuznetsov
kuznet at virtuozzo.com
Thu Apr 17 14:53:56 MSK 2025
Ack
On Thu, Apr 17, 2025 at 1:11 PM Liu Kui <kui.liu at virtuozzo.com> wrote:
>
> It's never a good idea of having to take the mutex lock of kernel rpc
> in the path of ioctl from userspace, because it could block the calling
> thread for very long time, especially when kernel rpc is trying to
> establish a connection.
>
> Currently it needs to take the mutex lock in krpc abort because it
> wants to clean up all aborting msg from kernel rpc which is not easy
> to do without taking the mutex lock. So this approach is perhaps
> unfixable.
>
> So this patch tries a different approach for krpc abort. Two flag
> bits PCS_MSG_BUSY and PCS_MSG_ABORT are introduced. Instead of
> removing aborting msgs from kernel rpc in krp abort, it now only
> set the PCS_MSG_ABORT flag bit to aborting msg, preventing netio
> from accessing associated user buffers. Meanwhile it needs to wait
> if the PCS_MSG_BUSY flag bit is set, meaning the aborting msg is
> currently under I/O by netio. It's very unlikely that the PCS_MSG_BUSY
> flag bit stays set for long time, thus unlikely blocking krpc
> abort for long time.
>
> Fixes: #VSTOR-104248
> https://virtuozzo.atlassian.net/browse/VSTOR-104248
>
> Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
> ---
> fs/fuse/kio/pcs/pcs_cs.c | 1 +
> fs/fuse/kio/pcs/pcs_krpc.c | 66 ++++++++++++-----------------------
> fs/fuse/kio/pcs/pcs_rdma_io.c | 14 ++++++++
> fs/fuse/kio/pcs/pcs_rpc.c | 2 ++
> fs/fuse/kio/pcs/pcs_sock_io.c | 35 ++++++++++++++++++-
> fs/fuse/kio/pcs/pcs_sock_io.h | 14 ++++++++
> 6 files changed, 88 insertions(+), 44 deletions(-)
>
> diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
> index ad398acb03ef..299ece862cd3 100644
> --- a/fs/fuse/kio/pcs/pcs_cs.c
> +++ b/fs/fuse/kio/pcs/pcs_cs.c
> @@ -715,6 +715,7 @@ static void do_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
> pcs_clear_error(&msg->error);
> msg->done = cs_sent;
> msg->get_iter = aligned_msg ? cs_get_data_aligned : cs_get_data;
> + msg->flags = 0;
>
> if ((map->state & PCS_MAP_DEAD) || (map->cs_list != csl)) {
> ireq->error.value = PCS_ERR_CSD_STALE_MAP;
> diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
> index 087d1d966e0d..3e404a2d0bde 100644
> --- a/fs/fuse/kio/pcs/pcs_krpc.c
> +++ b/fs/fuse/kio/pcs/pcs_krpc.c
> @@ -155,6 +155,20 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
>
> kreq = msg->private2;
>
> + set_bit(PCS_MSG_BUSY, &msg->flags);
> +
> + /* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */
> + smp_mb();
> +
> + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) {
> + pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST);
> + pcs_msg_del_calendar(msg);
> + list_del(&msg->list);
> + msg->stage = PCS_MSG_STAGE_DONE;
> + msg->done(msg);
> + return NULL;
> + }
> +
> resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr));
> if (!resp)
> return NULL;
> @@ -256,6 +270,8 @@ void pcs_krpc_response_done(struct pcs_msg *msg)
> {
> struct krpc_req *kreq = msg->private2;
>
> + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
> +
> if (msg->rpc) {
> pcs_rpc_put(msg->rpc);
> msg->rpc = NULL;
> @@ -579,6 +595,7 @@ static int kreq_make_sendmsg(struct krpc_req *kreq)
> msg->rpc = NULL;
> msg->done = pcs_krpc_msg_sent;
> msg->get_iter = krpc_msg_get_data;
> + msg->flags = 0;
>
> spin_lock(&krpc->lock);
> if (krpc->state != PCS_KRPC_STATE_CONNECTED ||
> @@ -711,9 +728,7 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
> {
> struct krpc_req *kreq, *tmp;
> struct krpc_completion *comp;
> - struct pcs_rpc *ep = krpc->rpc;
> struct pcs_msg *msg;
> - int timeout = 1000; /* 10 ms */
>
> spin_lock(&krpc->lock);
>
> @@ -744,52 +759,16 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
> list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue);
> spin_unlock(&krpc->lock);
>
> - /* nothing to be done */
> - if (list_empty(&krpc->dispose_queue))
> - return 0;
> -
> - /* abort incomplete requests */
> - mutex_lock(&ep->mutex);
> list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) {
> kreq->flags |= KRPC_REQ_F_ABORTED;
> msg = &kreq->msg;
> - /* if msg is cancelled, kreq will be removed from the queue */
> - pcs_rpc_cancel_msg(ep, msg);
> + /*
> + * The msg isn't freed immediately here however the user buffer
> + * won't be accessed.
> + */
> + pcs_msg_abort(msg, true);
> }
>
> - /*
> - * The krpc->dispose_queue should be empty if there are no requests in
> - * busy state. Otherwise wait until all busy requests complete. This
> - * should be a extremely rare case, therefore sleep is acceptable here.
> - *
> - * We cannot keep references to busy requests while waiting, because
> - * busy requests could have been freed.
> - */
> - while (!list_empty(&krpc->dispose_queue)) {
> - kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, link);
> - msg = &kreq->msg;
> -
> - /* no longer busy and cancelled */
> - if (!pcs_rpc_cancel_msg(ep, msg))
> - continue;
> -
> - /* seems somthing wrong happened to hardware, abort the rpc */
> - if (timeout == 0) {
> - rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
> - break;
> - }
> - mutex_unlock(&ep->mutex);
> -
> - /* sleep 10 us */
> - udelay(10);
> - timeout--;
> -
> - /* check again */
> - mutex_lock(&ep->mutex);
> - }
> -
> - mutex_unlock(&ep->mutex);
> -
> return 0;
> }
>
> @@ -1117,6 +1096,7 @@ int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
> msg = &connect_req->msg;
> msg->size = 0;
> msg->timeout = 0;
> + msg->flags = 0;
> msg->rpc = NULL;
> msg->done = krpc_connect_done;
> pcs_clear_error(&msg->error);
> diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c
> index d50f2c1e97e3..1d9e648d2636 100644
> --- a/fs/fuse/kio/pcs/pcs_rdma_io.c
> +++ b/fs/fuse/kio/pcs/pcs_rdma_io.c
> @@ -154,6 +154,7 @@ static struct pcs_msg *rio_dequeue_reserved_msg(struct pcs_rdmaio *rio)
> static void rio_msg_sent(struct pcs_rdmaio *rio, struct rio_tx *tx, struct pcs_msg *msg, int done)
> {
> if (done) {
> + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
> pcs_msg_sent(msg);
> msg->done(msg);
> } else {
> @@ -621,6 +622,19 @@ static int rio_submit(struct pcs_rdmaio *rio, struct pcs_msg *msg, int type, u64
> int offset = 0;
> struct iov_iter it;
>
> + if (msg) {
> + set_bit(PCS_MSG_BUSY, &msg->flags);
> +
> + /* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */
> + smp_mb();
> +
> + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) {
> + pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST);
> + rio_msg_sent(rio, tx, msg, 1);
> + return 0;
> + }
> + }
> +
> tx = RE_NULL(rio_get_tx(dev));
> if (!tx) {
> if (allow_again)
> diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
> index 71c2a3b54da7..f15d0c3fb7cd 100644
> --- a/fs/fuse/kio/pcs/pcs_rpc.c
> +++ b/fs/fuse/kio/pcs/pcs_rpc.c
> @@ -1374,6 +1374,7 @@ void pcs_rpc_init_input_msg(struct pcs_rpc * ep, struct pcs_msg * msg, int accou
> INIT_HLIST_NODE(&msg->kill_link);
> pcs_rpc_account_msg(ep, msg, account);
> msg->destructor = pcs_rpc_input_destructor;
> + msg->flags = 0;
> }
>
> struct pcs_msg * pcs_rpc_alloc_input_msg(struct pcs_rpc * ep, int datalen)
> @@ -1408,6 +1409,7 @@ void pcs_rpc_init_output_msg(struct pcs_msg * msg)
> msg->rpc = NULL;
> INIT_HLIST_NODE(&msg->kill_link);
> msg->destructor = pcs_msg_output_destructor;
> + msg->flags = 0;
> }
>
> struct pcs_msg * pcs_rpc_alloc_output_msg(int datalen)
> diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
> index 805b8f1e56b0..7eb231b7260d 100644
> --- a/fs/fuse/kio/pcs/pcs_sock_io.c
> +++ b/fs/fuse/kio/pcs/pcs_sock_io.c
> @@ -35,6 +35,23 @@ void pcs_msg_sent(struct pcs_msg * msg)
> }
> }
>
> +int pcs_msg_abort(struct pcs_msg *msg, bool wait)
> +{
> + set_bit(PCS_MSG_ABORTED, &msg->flags);
> +
> + /* pair with the smp_mb() between set PCS_MSG_BUSY and test PCS_MSG_ABORTED */
> + smp_mb();
> +
> + while (test_bit(PCS_MSG_BUSY, &msg->flags)) {
> + if (wait)
> + wait_on_bit(&msg->flags, PCS_MSG_BUSY, TASK_INTERRUPTIBLE);
> + else
> + return -EBUSY;
> + }
> +
> + return 0;
> +}
> +
> static void sio_push(struct pcs_sockio * sio)
> {
> TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->netio.parent));
> @@ -376,6 +393,17 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
> return;
> }
>
> + set_bit(PCS_MSG_BUSY, &msg->flags);
> +
> + /* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */
> + smp_mb();
> +
> + /* Shouldn't abort a half sent message */
> + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags)) && !sio->write_offset) {
> + pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST);
> + goto skip_send;
> + }
> +
> /* TODO: cond resched here? */
> while (sio->write_offset < msg->size) {
> size_t left = msg->size - sio->write_offset;
> @@ -409,6 +437,10 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
> return;
> }
> }
> +
> +skip_send:
> + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
> +
> list_del_init(&msg->list);
> sio->write_queue_len -= msg->size;
>
> @@ -634,7 +666,6 @@ struct pcs_msg * pcs_alloc_input_msg(struct pcs_sockio * sio, int datalen)
>
> msg = kmalloc(sizeof(struct pcs_msg) + datalen, GFP_NOIO);
> if (msg) {
> -
> pcs_msg_io_init(msg);
> pcs_account_msg(sio, msg);
> msg->destructor = pcs_msg_input_destructor;
> @@ -713,6 +744,7 @@ struct pcs_msg * pcs_clone_msg(struct pcs_msg * msg)
> clone->destructor = pcs_io_msg_output_destructor;
> clone->private = msg;
> clone->get_iter = get_iter_clone;
> + clone->flags = 0;
> }
> return clone;
> }
> @@ -752,6 +784,7 @@ struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int copy_len)
> clone->_inline_len = (short)copy_len;
> memcpy(clone->_inline_buffer, msg_inline_head(msg), copy_len);
> clone->get_iter = get_iter_cow_clone;
> + clone->flags = 0;
> }
> return clone;
> }
> diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
> index 09870b38cdad..b14e992a22d3 100644
> --- a/fs/fuse/kio/pcs/pcs_sock_io.h
> +++ b/fs/fuse/kio/pcs/pcs_sock_io.h
> @@ -35,6 +35,17 @@ struct pcs_api_channel
> unsigned msg_count;
> };
>
> +/**
> + * pcs_msg flags
> + *
> + * PCS_MSG_BUSY: set when the msg is under IO
> + * PCS_MSG_BORTED: the msg was aborted.
> + */
> +enum pcs_msg_flag {
> + PCS_MSG_BUSY,
> + PCS_MSG_ABORTED,
> +};
> +
> __pre_packed struct pcs_msg
> {
> struct __pre_aligned(16) {
> @@ -58,6 +69,8 @@ __pre_packed struct pcs_msg
> unsigned char stage;
> abs_time_t io_start_time;
>
> + unsigned long flags;
> +
> struct hlist_node kill_link;
>
> void (*get_iter)(struct pcs_msg *, int offset, struct iov_iter *it,
> @@ -181,6 +194,7 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, struct kvec *vec)
> }
> void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn);
> void pcs_msg_sent(struct pcs_msg * msg);
> +int pcs_msg_abort(struct pcs_msg *msg, bool wait);
>
> static inline void * msg_inline_head(struct pcs_msg * msg)
> {
> --
> 2.39.5 (Apple Git-154)
More information about the Devel
mailing list