[Devel] [PATCH VZ9 2/3] net: zerocopy over unix sockets
Pavel Tikhomirov
ptikhomirov at virtuozzo.com
Tue Jan 16 08:13:11 MSK 2024
On 09/01/2024 21:38, Alexey Kuznetsov wrote:
> Observation is that af_unix sockets today became slower
> and eat a lot of more cpu than 100G ethernet. So, implement
> MSG_ZEROCOPY over af_unix sockets to be able to talk to
> local services without collapse of performance.
>
> Unexpectedly, this makes sense! F.e. zerocopy cannot be done
> in TCP over loopback, because skbs when passing over loopback
> change ownership. But unix sockets traditionally implemented
> in the way that skbs queued on destination sockets still preserve
> ownership of source socket, so that completion notifications
> can be delivered back to sender without problems.
>
> Code looks neat and clean and might be submitted to mainstream
> one day. A little bit of dirt is that it shares IP_RECVERR
> control message, but rather this dirt is not mine, but
> by original author, who used protocol level message for generic
> socket level notifications without any good reasons, creating
> a mess for users.
>
> Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
> ---
> net/core/datagram.c | 2 +-
> net/core/sock.c | 3 ++
> net/unix/af_unix.c | 108 +++++++++++++++++++++++++++++++++++++++++++++-------
> 3 files changed, 98 insertions(+), 15 deletions(-)
>
> diff --git a/net/core/datagram.c b/net/core/datagram.c
> index 8eb5999..980248b 100644
> --- a/net/core/datagram.c
> +++ b/net/core/datagram.c
> @@ -640,7 +640,7 @@ int __zerocopy_sg_from_iter(struct sock *sk, struct sk_buff *skb,
> skb->data_len += copied;
> skb->len += copied;
> skb->truesize += truesize;
> - if (sk && sk->sk_type == SOCK_STREAM) {
> + if (sk && sk->sk_type == SOCK_STREAM && sk->sk_family != PF_UNIX) {
> sk_wmem_queued_add(sk, truesize);
> sk_mem_charge(sk, truesize);
> } else {
> diff --git a/net/core/sock.c b/net/core/sock.c
> index 5d7e16a..fe8e102 100644
> --- a/net/core/sock.c
> +++ b/net/core/sock.c
> @@ -1405,6 +1405,9 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
> (sk->sk_type == SOCK_DGRAM &&
> sk->sk_protocol == IPPROTO_UDP)))
> ret = -EOPNOTSUPP;
> + } else if (sk->sk_family == PF_UNIX) {
> + if (sk->sk_type == SOCK_DGRAM)
> + ret = -EOPNOTSUPP;
Likely we don't want to enable zerocopy for SOCK_SEQPACKET. Maybe change
to: "if (sk->sk_type != SOCK_STREAM)"
> } else if (sk->sk_family != PF_RDS) {
> ret = -EOPNOTSUPP;
> }
> diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
> index 36c3cb9..e32fe47 100644
> --- a/net/unix/af_unix.c
> +++ b/net/unix/af_unix.c
> @@ -115,6 +115,7 @@
> #include <linux/freezer.h>
> #include <linux/file.h>
> #include <linux/btf_ids.h>
> +#include <linux/errqueue.h>
>
> #include "scm.h"
>
> @@ -2054,6 +2055,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> {
> struct sock *sk = sock->sk;
> struct sock *other = NULL;
> + struct ubuf_info *uarg = NULL;
> int err, size;
> struct sk_buff *skb;
> int sent = 0;
> @@ -2083,22 +2085,37 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> if (sk->sk_shutdown & SEND_SHUTDOWN)
> goto pipe_err;
>
> + if ((msg->msg_flags & MSG_ZEROCOPY) && len && sock_flag(sk, SOCK_ZEROCOPY)) {
> + uarg = msg_zerocopy_alloc(sk, len);
> + if (!uarg) {
> + err = -ENOBUFS;
> + goto out_err;
> + }
> + }
> +
> while (sent < len) {
> size = len - sent;
>
> - /* Keep two messages in the pipe so it schedules better */
> - size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
> + if (!uarg) {
> + /* Keep two messages in the pipe so it schedules better */
> + size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
> +
> + /* allow fallback to order-0 allocations */
> + size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
>
> - /* allow fallback to order-0 allocations */
> - size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
> + data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
>
> - data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
> + data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
>
> - data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
> + skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
> + msg->msg_flags & MSG_DONTWAIT, &err,
> + get_order(UNIX_SKB_FRAGS_SZ));
> + } else {
> + size = min_t(int, size, sk->sk_sndbuf);
> + skb = sock_alloc_send_pskb(sk, 0, 0,
> + msg->msg_flags & MSG_DONTWAIT, &err, 0);
> + }
>
> - skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
> - msg->msg_flags & MSG_DONTWAIT, &err,
> - get_order(UNIX_SKB_FRAGS_SZ));
> if (!skb)
> goto out_err;
>
> @@ -2110,10 +2127,18 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> }
> fds_sent = true;
>
> - skb_put(skb, size - data_len);
> - skb->data_len = data_len;
> - skb->len = size;
> - err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
> + if (!uarg) {
> + skb_put(skb, size - data_len);
> + skb->data_len = data_len;
> + skb->len = size;
> + err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
> + } else {
> + err = skb_zerocopy_iter_stream(sk, skb, msg, size, uarg);
> + if (err > 0) {
> + size = err;
> + err = 0;
> + }
> + }
> if (err) {
> kfree_skb(skb);
> goto out_err;
> @@ -2133,6 +2158,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> sent += size;
> }
>
> + net_zcopy_put(uarg);
> scm_destroy(&scm);
>
> return sent;
> @@ -2145,6 +2171,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
> send_sig(SIGPIPE, current, 0);
> err = -EPIPE;
> out_err:
> + net_zcopy_put_abort(uarg, true);
> scm_destroy(&scm);
> return sent ? : err;
> }
> @@ -2725,6 +2752,46 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
> return copied ? : err;
> }
>
> +static int unix_recv_error(struct sock *sk, struct msghdr *msg, int len)
> +{
> + struct sock_exterr_skb *serr;
> + struct sk_buff *skb;
> + struct {
> + struct sock_extended_err ee;
> + } errhdr;
> + int err;
> + int copied;
> +
> + err = -EAGAIN;
> + skb = sock_dequeue_err_skb(sk);
> + if (!skb)
> + goto out;
> +
> + copied = skb->len;
> + if (copied > len) {
> + msg->msg_flags |= MSG_TRUNC;
> + copied = len;
> + }
> + err = skb_copy_datagram_msg(skb, 0, msg, copied);
> + if (unlikely(err)) {
> + kfree_skb(skb);
> + return err;
> + }
> +
> + serr = SKB_EXT_ERR(skb);
> +
> + memcpy(&errhdr.ee, &serr->ee, sizeof(struct sock_extended_err));
> +
> + put_cmsg(msg, SOL_IP, IP_RECVERR, sizeof(errhdr), &errhdr);
> +
> + msg->msg_flags |= MSG_ERRQUEUE;
> + err = copied;
> +
> + consume_skb(skb);
> +out:
> + return err;
> +}
> +
> static int unix_stream_read_actor(struct sk_buff *skb,
> int skip, int chunk,
> struct unix_stream_read_state *state)
> @@ -2769,6 +2836,10 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
> return prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
> flags & ~MSG_DONTWAIT, NULL);
> #endif
> +
> + if (unlikely(flags & MSG_ERRQUEUE))
> + return unix_recv_error(sk, msg, size);
In case of MSG_PEEK it might be not what user wants to receive zero-copy
notification error message.
> +
> return unix_stream_read_generic(&state, true);
> }
>
> @@ -2776,6 +2847,15 @@ static int unix_stream_splice_actor(struct sk_buff *skb,
> int skip, int chunk,
> struct unix_stream_read_state *state)
> {
> + /* Zerocopy pages cannot be spliced, alas. It looks like splice interface
> + * gives no way to notify about actual page consumption. So, we have to copy.
> + * This path is not going be legit, sender will be notified and will stop zerocopying.
> + */
> + int err = skb_orphan_frags_rx(skb, GFP_KERNEL);
> +
I prefer not to put function call on the same line with variable
declaration, especially when this function requires error checking. This
way codding style can both confirm to a) having newline between
variables and other code and b) checking function return on the next
line without extra newline in between.
> + if (err)
> + return err;
> +
> return skb_splice_bits(skb, state->socket->sk,
> UNIXCB(skb).consumed + skip,
> state->pipe, chunk, state->splice_flags);
> @@ -2962,7 +3042,7 @@ static __poll_t unix_poll(struct file *file, struct socket *sock, poll_table *wa
> mask = 0;
>
> /* exceptional events? */
> - if (sk->sk_err)
> + if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
> mask |= EPOLLERR;
> if (sk->sk_shutdown == SHUTDOWN_MASK)
> mask |= EPOLLHUP;
--
Best regards, Tikhomirov Pavel
Senior Software Developer, Virtuozzo.
More information about the Devel
mailing list