[Devel] [PATCH VZ9 2/3] net: zerocopy over unix sockets

Konstantin Khorenko khorenko at virtuozzo.com
Wed Jan 10 19:37:49 MSK 2024


Pasha, can you please review this?

--
Best regards,

Konstantin Khorenko,
Virtuozzo Linux Kernel Team

On 09.01.2024 14:38, Alexey Kuznetsov wrote:
> Observation is that af_unix sockets today became slower
> and eat a lot of more cpu than 100G ethernet. So, implement
> MSG_ZEROCOPY over af_unix sockets to be able to talk to
> local services without collapse of performance.
> 
> Unexpectedly, this makes sense! F.e. zerocopy cannot be done
> in TCP over loopback, because skbs when passing over loopback
> change ownership. But unix sockets traditionally implemented
> in the way that skbs queued on destination sockets still preserve
> ownership of source socket, so that completion notifications
> can be delivered back to sender without problems.
> 
> Code looks neat and clean and might be submitted to mainstream
> one day. A little bit of dirt is that it shares IP_RECVERR
> control message, but rather this dirt is not mine, but
> by original author, who used protocol level message for generic
> socket level notifications without any good reasons, creating
> a mess for users.
> 
> Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
> ---
>   net/core/datagram.c |   2 +-
>   net/core/sock.c     |   3 ++
>   net/unix/af_unix.c  | 108 +++++++++++++++++++++++++++++++++++++++++++++-------
>   3 files changed, 98 insertions(+), 15 deletions(-)
> 
> diff --git a/net/core/datagram.c b/net/core/datagram.c
> index 8eb5999..980248b 100644
> --- a/net/core/datagram.c
> +++ b/net/core/datagram.c
> @@ -640,7 +640,7 @@ int __zerocopy_sg_from_iter(struct sock *sk, struct sk_buff *skb,
>   		skb->data_len += copied;
>   		skb->len += copied;
>   		skb->truesize += truesize;
> -		if (sk && sk->sk_type == SOCK_STREAM) {
> +		if (sk && sk->sk_type == SOCK_STREAM && sk->sk_family != PF_UNIX) {
>   			sk_wmem_queued_add(sk, truesize);
>   			sk_mem_charge(sk, truesize);
>   		} else {
> diff --git a/net/core/sock.c b/net/core/sock.c
> index 5d7e16a..fe8e102 100644
> --- a/net/core/sock.c
> +++ b/net/core/sock.c
> @@ -1405,6 +1405,9 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
>   			      (sk->sk_type == SOCK_DGRAM &&
>   			       sk->sk_protocol == IPPROTO_UDP)))
>   				ret = -EOPNOTSUPP;
> +		} else if (sk->sk_family == PF_UNIX) {
> +			if (sk->sk_type == SOCK_DGRAM)
> +				ret = -EOPNOTSUPP;
>   		} else if (sk->sk_family != PF_RDS) {
>   			ret = -EOPNOTSUPP;
>   		}
> diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
> index 36c3cb9..e32fe47 100644
> --- a/net/unix/af_unix.c
> +++ b/net/unix/af_unix.c
> @@ -115,6 +115,7 @@
>   #include <linux/freezer.h>
>   #include <linux/file.h>
>   #include <linux/btf_ids.h>
> +#include <linux/errqueue.h>
>   
>   #include "scm.h"
>   
> @@ -2054,6 +2055,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>   {
>   	struct sock *sk = sock->sk;
>   	struct sock *other = NULL;
> +	struct ubuf_info *uarg = NULL;
>   	int err, size;
>   	struct sk_buff *skb;
>   	int sent = 0;
> @@ -2083,22 +2085,37 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>   	if (sk->sk_shutdown & SEND_SHUTDOWN)
>   		goto pipe_err;
>   
> +	if ((msg->msg_flags & MSG_ZEROCOPY) && len && sock_flag(sk, SOCK_ZEROCOPY)) {
> +		uarg = msg_zerocopy_alloc(sk, len);
> +		if (!uarg) {
> +			err = -ENOBUFS;
> +			goto out_err;
> +		}
> +	}
> +
>   	while (sent < len) {
>   		size = len - sent;
>   
> -		/* Keep two messages in the pipe so it schedules better */
> -		size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
> +		if (!uarg) {
> +			/* Keep two messages in the pipe so it schedules better */
> +			size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
> +
> +			/* allow fallback to order-0 allocations */
> +			size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
>   
> -		/* allow fallback to order-0 allocations */
> -		size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
> +			data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
>   
> -		data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
> +			data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
>   
> -		data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
> +			skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
> +						   msg->msg_flags & MSG_DONTWAIT, &err,
> +						   get_order(UNIX_SKB_FRAGS_SZ));
> +		} else {
> +			size = min_t(int, size, sk->sk_sndbuf);
> +			skb = sock_alloc_send_pskb(sk, 0, 0,
> +						   msg->msg_flags & MSG_DONTWAIT, &err, 0);
> +		}
>   
> -		skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
> -					   msg->msg_flags & MSG_DONTWAIT, &err,
> -					   get_order(UNIX_SKB_FRAGS_SZ));
>   		if (!skb)
>   			goto out_err;
>   
> @@ -2110,10 +2127,18 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>   		}
>   		fds_sent = true;
>   
> -		skb_put(skb, size - data_len);
> -		skb->data_len = data_len;
> -		skb->len = size;
> -		err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
> +		if (!uarg) {
> +			skb_put(skb, size - data_len);
> +			skb->data_len = data_len;
> +			skb->len = size;
> +			err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
> +		} else {
> +			err = skb_zerocopy_iter_stream(sk, skb, msg, size, uarg);
> +			if (err > 0) {
> +				size = err;
> +				err = 0;
> +			}
> +		}
>   		if (err) {
>   			kfree_skb(skb);
>   			goto out_err;
> @@ -2133,6 +2158,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>   		sent += size;
>   	}
>   
> +	net_zcopy_put(uarg);
>   	scm_destroy(&scm);
>   
>   	return sent;
> @@ -2145,6 +2171,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
>   		send_sig(SIGPIPE, current, 0);
>   	err = -EPIPE;
>   out_err:
> +	net_zcopy_put_abort(uarg, true);
>   	scm_destroy(&scm);
>   	return sent ? : err;
>   }
> @@ -2725,6 +2752,46 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
>   	return copied ? : err;
>   }
>   
> +static int unix_recv_error(struct sock *sk, struct msghdr *msg, int len)
> +{
> +	struct sock_exterr_skb *serr;
> +	struct sk_buff *skb;
> +	struct {
> +		struct sock_extended_err ee;
> +	} errhdr;
> +	int err;
> +	int copied;
> +
> +	err = -EAGAIN;
> +	skb = sock_dequeue_err_skb(sk);
> +	if (!skb)
> +		goto out;
> +
> +	copied = skb->len;
> +	if (copied > len) {
> +		msg->msg_flags |= MSG_TRUNC;
> +		copied = len;
> +	}
> +	err = skb_copy_datagram_msg(skb, 0, msg, copied);
> +	if (unlikely(err)) {
> +		kfree_skb(skb);
> +		return err;
> +	}
> +
> +	serr = SKB_EXT_ERR(skb);
> +
> +	memcpy(&errhdr.ee, &serr->ee, sizeof(struct sock_extended_err));
> +
> +	put_cmsg(msg, SOL_IP, IP_RECVERR, sizeof(errhdr), &errhdr);
> +
> +	msg->msg_flags |= MSG_ERRQUEUE;
> +	err = copied;
> +
> +	consume_skb(skb);
> +out:
> +	return err;
> +}
> +
>   static int unix_stream_read_actor(struct sk_buff *skb,
>   				  int skip, int chunk,
>   				  struct unix_stream_read_state *state)
> @@ -2769,6 +2836,10 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
>   		return prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
>   					    flags & ~MSG_DONTWAIT, NULL);
>   #endif
> +
> +	if (unlikely(flags & MSG_ERRQUEUE))
> +		return unix_recv_error(sk, msg, size);
> +
>   	return unix_stream_read_generic(&state, true);
>   }
>   
> @@ -2776,6 +2847,15 @@ static int unix_stream_splice_actor(struct sk_buff *skb,
>   				    int skip, int chunk,
>   				    struct unix_stream_read_state *state)
>   {
> +	/* Zerocopy pages cannot be spliced, alas. It looks like splice interface
> +	 * gives no way to notify about actual page consumption. So, we have to copy.
> +	 * This path is not going be legit, sender will be notified and will stop zerocopying.
> +	 */
> +	int err = skb_orphan_frags_rx(skb, GFP_KERNEL);
> +
> +	if (err)
> +		return err;
> +
>   	return skb_splice_bits(skb, state->socket->sk,
>   			       UNIXCB(skb).consumed + skip,
>   			       state->pipe, chunk, state->splice_flags);
> @@ -2962,7 +3042,7 @@ static __poll_t unix_poll(struct file *file, struct socket *sock, poll_table *wa
>   	mask = 0;
>   
>   	/* exceptional events? */
> -	if (sk->sk_err)
> +	if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
>   		mask |= EPOLLERR;
>   	if (sk->sk_shutdown == SHUTDOWN_MASK)
>   		mask |= EPOLLHUP;


More information about the Devel mailing list