[Devel] [PATCH RHEL9 COMMIT] net: zerocopy over unix sockets

Konstantin Khorenko khorenko at virtuozzo.com
Thu Jan 11 19:15:35 MSK 2024


The commit is pushed to "branch-rh9-5.14.0-362.8.1.vz9.35.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh9-5.14.0-362.8.1.vz9.35.6
------>
commit 137e8807d5b005a1c16aaa635c1dceb9928f5272
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date:   Tue Jan 9 21:38:08 2024 +0800

    net: zerocopy over unix sockets
    
    Observation is that af_unix sockets today became slower
    and eat a lot of more cpu than 100G ethernet. So, implement
    MSG_ZEROCOPY over af_unix sockets to be able to talk to
    local services without collapse of performance.
    
    Unexpectedly, this makes sense! F.e. zerocopy cannot be done
    in TCP over loopback, because skbs when passing over loopback
    change ownership. But unix sockets traditionally implemented
    in the way that skbs queued on destination sockets still preserve
    ownership of source socket, so that completion notifications
    can be delivered back to sender without problems.
    
    Code looks neat and clean and might be submitted to mainstream
    one day. A little bit of dirt is that it shares IP_RECVERR
    control message, but rather this dirt is not mine, but
    by original author, who used protocol level message for generic
    socket level notifications without any good reasons, creating
    a mess for users.
    
    https://pmc.acronis.work/browse/VSTOR-79527
    
    Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
    
    Feature: net: zerocopy over unix sockets
---
 net/core/datagram.c |   2 +-
 net/core/sock.c     |   3 ++
 net/unix/af_unix.c  | 108 +++++++++++++++++++++++++++++++++++++++++++++-------
 3 files changed, 98 insertions(+), 15 deletions(-)

diff --git a/net/core/datagram.c b/net/core/datagram.c
index 07f1c651542f..c59c751bed13 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -644,7 +644,7 @@ int __zerocopy_sg_from_iter(struct msghdr *msg, struct sock *sk,
 		skb->data_len += copied;
 		skb->len += copied;
 		skb->truesize += truesize;
-		if (sk && sk->sk_type == SOCK_STREAM) {
+		if (sk && sk->sk_type == SOCK_STREAM && sk->sk_family != PF_UNIX) {
 			sk_wmem_queued_add(sk, truesize);
 			if (!skb_zcopy_pure(skb))
 				sk_mem_charge(sk, truesize);
diff --git a/net/core/sock.c b/net/core/sock.c
index 3276f74dc018..e53ec1279c14 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -1447,6 +1447,9 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
 			      (sk->sk_type == SOCK_DGRAM &&
 			       sk->sk_protocol == IPPROTO_UDP)))
 				ret = -EOPNOTSUPP;
+		} else if (sk->sk_family == PF_UNIX) {
+			if (sk->sk_type == SOCK_DGRAM)
+				ret = -EOPNOTSUPP;
 		} else if (sk->sk_family != PF_RDS) {
 			ret = -EOPNOTSUPP;
 		}
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index a2448274b7ad..8ebaaa41fa73 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -115,6 +115,7 @@
 #include <linux/freezer.h>
 #include <linux/file.h>
 #include <linux/btf_ids.h>
+#include <linux/errqueue.h>
 
 #include "scm.h"
 
@@ -2054,6 +2055,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 {
 	struct sock *sk = sock->sk;
 	struct sock *other = NULL;
+	struct ubuf_info *uarg = NULL;
 	int err, size;
 	struct sk_buff *skb;
 	int sent = 0;
@@ -2083,22 +2085,37 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 	if (sk->sk_shutdown & SEND_SHUTDOWN)
 		goto pipe_err;
 
+	if ((msg->msg_flags & MSG_ZEROCOPY) && len && sock_flag(sk, SOCK_ZEROCOPY)) {
+		uarg = msg_zerocopy_alloc(sk, len);
+		if (!uarg) {
+			err = -ENOBUFS;
+			goto out_err;
+		}
+	}
+
 	while (sent < len) {
 		size = len - sent;
 
-		/* Keep two messages in the pipe so it schedules better */
-		size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
+		if (!uarg) {
+			/* Keep two messages in the pipe so it schedules better */
+			size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
+
+			/* allow fallback to order-0 allocations */
+			size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
 
-		/* allow fallback to order-0 allocations */
-		size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
+			data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
 
-		data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
+			data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
 
-		data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
+			skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
+						   msg->msg_flags & MSG_DONTWAIT, &err,
+						   get_order(UNIX_SKB_FRAGS_SZ));
+		} else {
+			size = min_t(int, size, sk->sk_sndbuf);
+			skb = sock_alloc_send_pskb(sk, 0, 0,
+						   msg->msg_flags & MSG_DONTWAIT, &err, 0);
+		}
 
-		skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
-					   msg->msg_flags & MSG_DONTWAIT, &err,
-					   get_order(UNIX_SKB_FRAGS_SZ));
 		if (!skb)
 			goto out_err;
 
@@ -2110,10 +2127,18 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 		}
 		fds_sent = true;
 
-		skb_put(skb, size - data_len);
-		skb->data_len = data_len;
-		skb->len = size;
-		err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
+		if (!uarg) {
+			skb_put(skb, size - data_len);
+			skb->data_len = data_len;
+			skb->len = size;
+			err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
+		} else {
+			err = skb_zerocopy_iter_stream(sk, skb, msg, size, uarg);
+			if (err > 0) {
+				size = err;
+				err = 0;
+			}
+		}
 		if (err) {
 			kfree_skb(skb);
 			goto out_err;
@@ -2133,6 +2158,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 		sent += size;
 	}
 
+	net_zcopy_put(uarg);
 	scm_destroy(&scm);
 
 	return sent;
@@ -2145,6 +2171,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 		send_sig(SIGPIPE, current, 0);
 	err = -EPIPE;
 out_err:
+	net_zcopy_put_abort(uarg, true);
 	scm_destroy(&scm);
 	return sent ? : err;
 }
@@ -2708,6 +2735,46 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
 	return copied ? : err;
 }
 
+static int unix_recv_error(struct sock *sk, struct msghdr *msg, int len)
+{
+	struct sock_exterr_skb *serr;
+	struct sk_buff *skb;
+	struct {
+		struct sock_extended_err ee;
+	} errhdr;
+	int err;
+	int copied;
+
+	err = -EAGAIN;
+	skb = sock_dequeue_err_skb(sk);
+	if (!skb)
+		goto out;
+
+	copied = skb->len;
+	if (copied > len) {
+		msg->msg_flags |= MSG_TRUNC;
+		copied = len;
+	}
+	err = skb_copy_datagram_msg(skb, 0, msg, copied);
+	if (unlikely(err)) {
+		kfree_skb(skb);
+		return err;
+	}
+
+	serr = SKB_EXT_ERR(skb);
+
+	memcpy(&errhdr.ee, &serr->ee, sizeof(struct sock_extended_err));
+
+	put_cmsg(msg, SOL_IP, IP_RECVERR, sizeof(errhdr), &errhdr);
+
+	msg->msg_flags |= MSG_ERRQUEUE;
+	err = copied;
+
+	consume_skb(skb);
+out:
+	return err;
+}
+
 static int unix_stream_read_actor(struct sk_buff *skb,
 				  int skip, int chunk,
 				  struct unix_stream_read_state *state)
@@ -2752,6 +2819,10 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
 		return prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
 					    flags & ~MSG_DONTWAIT, NULL);
 #endif
+
+	if (unlikely(flags & MSG_ERRQUEUE))
+		return unix_recv_error(sk, msg, size);
+
 	return unix_stream_read_generic(&state, true);
 }
 
@@ -2759,6 +2830,15 @@ static int unix_stream_splice_actor(struct sk_buff *skb,
 				    int skip, int chunk,
 				    struct unix_stream_read_state *state)
 {
+	/* Zerocopy pages cannot be spliced, alas. It looks like splice interface
+	 * gives no way to notify about actual page consumption. So, we have to copy.
+	 * This path is not going be legit, sender will be notified and will stop zerocopying.
+	 */
+	int err = skb_orphan_frags_rx(skb, GFP_KERNEL);
+
+	if (err)
+		return err;
+
 	return skb_splice_bits(skb, state->socket->sk,
 			       UNIXCB(skb).consumed + skip,
 			       state->pipe, chunk, state->splice_flags);
@@ -2945,7 +3025,7 @@ static __poll_t unix_poll(struct file *file, struct socket *sock, poll_table *wa
 	mask = 0;
 
 	/* exceptional events? */
-	if (sk->sk_err)
+	if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
 		mask |= EPOLLERR;
 	if (sk->sk_shutdown == SHUTDOWN_MASK)
 		mask |= EPOLLHUP;


More information about the Devel mailing list