[Devel] [PATCH VZ9 2/3] net: zerocopy over unix sockets

Alexey Kuznetsov kuznet at virtuozzo.com
Tue Jan 9 16:38:08 MSK 2024


Observation is that af_unix sockets today became slower
and eat a lot of more cpu than 100G ethernet. So, implement
MSG_ZEROCOPY over af_unix sockets to be able to talk to
local services without collapse of performance.

Unexpectedly, this makes sense! F.e. zerocopy cannot be done
in TCP over loopback, because skbs when passing over loopback
change ownership. But unix sockets traditionally implemented
in the way that skbs queued on destination sockets still preserve
ownership of source socket, so that completion notifications
can be delivered back to sender without problems.

Code looks neat and clean and might be submitted to mainstream
one day. A little bit of dirt is that it shares IP_RECVERR
control message, but rather this dirt is not mine, but
by original author, who used protocol level message for generic
socket level notifications without any good reasons, creating
a mess for users.

Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
---
 net/core/datagram.c |   2 +-
 net/core/sock.c     |   3 ++
 net/unix/af_unix.c  | 108 +++++++++++++++++++++++++++++++++++++++++++++-------
 3 files changed, 98 insertions(+), 15 deletions(-)

diff --git a/net/core/datagram.c b/net/core/datagram.c
index 8eb5999..980248b 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -640,7 +640,7 @@ int __zerocopy_sg_from_iter(struct sock *sk, struct sk_buff *skb,
 		skb->data_len += copied;
 		skb->len += copied;
 		skb->truesize += truesize;
-		if (sk && sk->sk_type == SOCK_STREAM) {
+		if (sk && sk->sk_type == SOCK_STREAM && sk->sk_family != PF_UNIX) {
 			sk_wmem_queued_add(sk, truesize);
 			sk_mem_charge(sk, truesize);
 		} else {
diff --git a/net/core/sock.c b/net/core/sock.c
index 5d7e16a..fe8e102 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -1405,6 +1405,9 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
 			      (sk->sk_type == SOCK_DGRAM &&
 			       sk->sk_protocol == IPPROTO_UDP)))
 				ret = -EOPNOTSUPP;
+		} else if (sk->sk_family == PF_UNIX) {
+			if (sk->sk_type == SOCK_DGRAM)
+				ret = -EOPNOTSUPP;
 		} else if (sk->sk_family != PF_RDS) {
 			ret = -EOPNOTSUPP;
 		}
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 36c3cb9..e32fe47 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -115,6 +115,7 @@
 #include <linux/freezer.h>
 #include <linux/file.h>
 #include <linux/btf_ids.h>
+#include <linux/errqueue.h>
 
 #include "scm.h"
 
@@ -2054,6 +2055,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 {
 	struct sock *sk = sock->sk;
 	struct sock *other = NULL;
+	struct ubuf_info *uarg = NULL;
 	int err, size;
 	struct sk_buff *skb;
 	int sent = 0;
@@ -2083,22 +2085,37 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 	if (sk->sk_shutdown & SEND_SHUTDOWN)
 		goto pipe_err;
 
+	if ((msg->msg_flags & MSG_ZEROCOPY) && len && sock_flag(sk, SOCK_ZEROCOPY)) {
+		uarg = msg_zerocopy_alloc(sk, len);
+		if (!uarg) {
+			err = -ENOBUFS;
+			goto out_err;
+		}
+	}
+
 	while (sent < len) {
 		size = len - sent;
 
-		/* Keep two messages in the pipe so it schedules better */
-		size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
+		if (!uarg) {
+			/* Keep two messages in the pipe so it schedules better */
+			size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
+
+			/* allow fallback to order-0 allocations */
+			size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
 
-		/* allow fallback to order-0 allocations */
-		size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
+			data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
 
-		data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
+			data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
 
-		data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
+			skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
+						   msg->msg_flags & MSG_DONTWAIT, &err,
+						   get_order(UNIX_SKB_FRAGS_SZ));
+		} else {
+			size = min_t(int, size, sk->sk_sndbuf);
+			skb = sock_alloc_send_pskb(sk, 0, 0,
+						   msg->msg_flags & MSG_DONTWAIT, &err, 0);
+		}
 
-		skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
-					   msg->msg_flags & MSG_DONTWAIT, &err,
-					   get_order(UNIX_SKB_FRAGS_SZ));
 		if (!skb)
 			goto out_err;
 
@@ -2110,10 +2127,18 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 		}
 		fds_sent = true;
 
-		skb_put(skb, size - data_len);
-		skb->data_len = data_len;
-		skb->len = size;
-		err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
+		if (!uarg) {
+			skb_put(skb, size - data_len);
+			skb->data_len = data_len;
+			skb->len = size;
+			err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
+		} else {
+			err = skb_zerocopy_iter_stream(sk, skb, msg, size, uarg);
+			if (err > 0) {
+				size = err;
+				err = 0;
+			}
+		}
 		if (err) {
 			kfree_skb(skb);
 			goto out_err;
@@ -2133,6 +2158,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 		sent += size;
 	}
 
+	net_zcopy_put(uarg);
 	scm_destroy(&scm);
 
 	return sent;
@@ -2145,6 +2171,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
 		send_sig(SIGPIPE, current, 0);
 	err = -EPIPE;
 out_err:
+	net_zcopy_put_abort(uarg, true);
 	scm_destroy(&scm);
 	return sent ? : err;
 }
@@ -2725,6 +2752,46 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
 	return copied ? : err;
 }
 
+static int unix_recv_error(struct sock *sk, struct msghdr *msg, int len)
+{
+	struct sock_exterr_skb *serr;
+	struct sk_buff *skb;
+	struct {
+		struct sock_extended_err ee;
+	} errhdr;
+	int err;
+	int copied;
+
+	err = -EAGAIN;
+	skb = sock_dequeue_err_skb(sk);
+	if (!skb)
+		goto out;
+
+	copied = skb->len;
+	if (copied > len) {
+		msg->msg_flags |= MSG_TRUNC;
+		copied = len;
+	}
+	err = skb_copy_datagram_msg(skb, 0, msg, copied);
+	if (unlikely(err)) {
+		kfree_skb(skb);
+		return err;
+	}
+
+	serr = SKB_EXT_ERR(skb);
+
+	memcpy(&errhdr.ee, &serr->ee, sizeof(struct sock_extended_err));
+
+	put_cmsg(msg, SOL_IP, IP_RECVERR, sizeof(errhdr), &errhdr);
+
+	msg->msg_flags |= MSG_ERRQUEUE;
+	err = copied;
+
+	consume_skb(skb);
+out:
+	return err;
+}
+
 static int unix_stream_read_actor(struct sk_buff *skb,
 				  int skip, int chunk,
 				  struct unix_stream_read_state *state)
@@ -2769,6 +2836,10 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
 		return prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
 					    flags & ~MSG_DONTWAIT, NULL);
 #endif
+
+	if (unlikely(flags & MSG_ERRQUEUE))
+		return unix_recv_error(sk, msg, size);
+
 	return unix_stream_read_generic(&state, true);
 }
 
@@ -2776,6 +2847,15 @@ static int unix_stream_splice_actor(struct sk_buff *skb,
 				    int skip, int chunk,
 				    struct unix_stream_read_state *state)
 {
+	/* Zerocopy pages cannot be spliced, alas. It looks like splice interface
+	 * gives no way to notify about actual page consumption. So, we have to copy.
+	 * This path is not going be legit, sender will be notified and will stop zerocopying.
+	 */
+	int err = skb_orphan_frags_rx(skb, GFP_KERNEL);
+
+	if (err)
+		return err;
+
 	return skb_splice_bits(skb, state->socket->sk,
 			       UNIXCB(skb).consumed + skip,
 			       state->pipe, chunk, state->splice_flags);
@@ -2962,7 +3042,7 @@ static __poll_t unix_poll(struct file *file, struct socket *sock, poll_table *wa
 	mask = 0;
 
 	/* exceptional events? */
-	if (sk->sk_err)
+	if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
 		mask |= EPOLLERR;
 	if (sk->sk_shutdown == SHUTDOWN_MASK)
 		mask |= EPOLLHUP;
-- 
1.8.3.1



More information about the Devel mailing list