[Devel] [PATCH RHEL9 COMMIT] net: zerocopy over unix sockets
Konstantin Khorenko
khorenko at virtuozzo.com
Thu Jan 11 19:15:35 MSK 2024
The commit is pushed to "branch-rh9-5.14.0-362.8.1.vz9.35.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh9-5.14.0-362.8.1.vz9.35.6
------>
commit 137e8807d5b005a1c16aaa635c1dceb9928f5272
Author: Alexey Kuznetsov <kuznet at virtuozzo.com>
Date: Tue Jan 9 21:38:08 2024 +0800
net: zerocopy over unix sockets
Observation is that af_unix sockets today became slower
and eat a lot of more cpu than 100G ethernet. So, implement
MSG_ZEROCOPY over af_unix sockets to be able to talk to
local services without collapse of performance.
Unexpectedly, this makes sense! F.e. zerocopy cannot be done
in TCP over loopback, because skbs when passing over loopback
change ownership. But unix sockets traditionally implemented
in the way that skbs queued on destination sockets still preserve
ownership of source socket, so that completion notifications
can be delivered back to sender without problems.
Code looks neat and clean and might be submitted to mainstream
one day. A little bit of dirt is that it shares IP_RECVERR
control message, but rather this dirt is not mine, but
by original author, who used protocol level message for generic
socket level notifications without any good reasons, creating
a mess for users.
https://pmc.acronis.work/browse/VSTOR-79527
Signed-off-by: Alexey Kuznetsov <kuznet at acronis.com>
Feature: net: zerocopy over unix sockets
---
net/core/datagram.c | 2 +-
net/core/sock.c | 3 ++
net/unix/af_unix.c | 108 +++++++++++++++++++++++++++++++++++++++++++++-------
3 files changed, 98 insertions(+), 15 deletions(-)
diff --git a/net/core/datagram.c b/net/core/datagram.c
index 07f1c651542f..c59c751bed13 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -644,7 +644,7 @@ int __zerocopy_sg_from_iter(struct msghdr *msg, struct sock *sk,
skb->data_len += copied;
skb->len += copied;
skb->truesize += truesize;
- if (sk && sk->sk_type == SOCK_STREAM) {
+ if (sk && sk->sk_type == SOCK_STREAM && sk->sk_family != PF_UNIX) {
sk_wmem_queued_add(sk, truesize);
if (!skb_zcopy_pure(skb))
sk_mem_charge(sk, truesize);
diff --git a/net/core/sock.c b/net/core/sock.c
index 3276f74dc018..e53ec1279c14 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -1447,6 +1447,9 @@ int sk_setsockopt(struct sock *sk, int level, int optname,
(sk->sk_type == SOCK_DGRAM &&
sk->sk_protocol == IPPROTO_UDP)))
ret = -EOPNOTSUPP;
+ } else if (sk->sk_family == PF_UNIX) {
+ if (sk->sk_type == SOCK_DGRAM)
+ ret = -EOPNOTSUPP;
} else if (sk->sk_family != PF_RDS) {
ret = -EOPNOTSUPP;
}
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index a2448274b7ad..8ebaaa41fa73 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -115,6 +115,7 @@
#include <linux/freezer.h>
#include <linux/file.h>
#include <linux/btf_ids.h>
+#include <linux/errqueue.h>
#include "scm.h"
@@ -2054,6 +2055,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
{
struct sock *sk = sock->sk;
struct sock *other = NULL;
+ struct ubuf_info *uarg = NULL;
int err, size;
struct sk_buff *skb;
int sent = 0;
@@ -2083,22 +2085,37 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
if (sk->sk_shutdown & SEND_SHUTDOWN)
goto pipe_err;
+ if ((msg->msg_flags & MSG_ZEROCOPY) && len && sock_flag(sk, SOCK_ZEROCOPY)) {
+ uarg = msg_zerocopy_alloc(sk, len);
+ if (!uarg) {
+ err = -ENOBUFS;
+ goto out_err;
+ }
+ }
+
while (sent < len) {
size = len - sent;
- /* Keep two messages in the pipe so it schedules better */
- size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
+ if (!uarg) {
+ /* Keep two messages in the pipe so it schedules better */
+ size = min_t(int, size, (sk->sk_sndbuf >> 1) - 64);
+
+ /* allow fallback to order-0 allocations */
+ size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
- /* allow fallback to order-0 allocations */
- size = min_t(int, size, SKB_MAX_HEAD(0) + UNIX_SKB_FRAGS_SZ);
+ data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
- data_len = max_t(int, 0, size - SKB_MAX_HEAD(0));
+ data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
- data_len = min_t(size_t, size, PAGE_ALIGN(data_len));
+ skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
+ msg->msg_flags & MSG_DONTWAIT, &err,
+ get_order(UNIX_SKB_FRAGS_SZ));
+ } else {
+ size = min_t(int, size, sk->sk_sndbuf);
+ skb = sock_alloc_send_pskb(sk, 0, 0,
+ msg->msg_flags & MSG_DONTWAIT, &err, 0);
+ }
- skb = sock_alloc_send_pskb(sk, size - data_len, data_len,
- msg->msg_flags & MSG_DONTWAIT, &err,
- get_order(UNIX_SKB_FRAGS_SZ));
if (!skb)
goto out_err;
@@ -2110,10 +2127,18 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
}
fds_sent = true;
- skb_put(skb, size - data_len);
- skb->data_len = data_len;
- skb->len = size;
- err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
+ if (!uarg) {
+ skb_put(skb, size - data_len);
+ skb->data_len = data_len;
+ skb->len = size;
+ err = skb_copy_datagram_from_iter(skb, 0, &msg->msg_iter, size);
+ } else {
+ err = skb_zerocopy_iter_stream(sk, skb, msg, size, uarg);
+ if (err > 0) {
+ size = err;
+ err = 0;
+ }
+ }
if (err) {
kfree_skb(skb);
goto out_err;
@@ -2133,6 +2158,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
sent += size;
}
+ net_zcopy_put(uarg);
scm_destroy(&scm);
return sent;
@@ -2145,6 +2171,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
send_sig(SIGPIPE, current, 0);
err = -EPIPE;
out_err:
+ net_zcopy_put_abort(uarg, true);
scm_destroy(&scm);
return sent ? : err;
}
@@ -2708,6 +2735,46 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
return copied ? : err;
}
+static int unix_recv_error(struct sock *sk, struct msghdr *msg, int len)
+{
+ struct sock_exterr_skb *serr;
+ struct sk_buff *skb;
+ struct {
+ struct sock_extended_err ee;
+ } errhdr;
+ int err;
+ int copied;
+
+ err = -EAGAIN;
+ skb = sock_dequeue_err_skb(sk);
+ if (!skb)
+ goto out;
+
+ copied = skb->len;
+ if (copied > len) {
+ msg->msg_flags |= MSG_TRUNC;
+ copied = len;
+ }
+ err = skb_copy_datagram_msg(skb, 0, msg, copied);
+ if (unlikely(err)) {
+ kfree_skb(skb);
+ return err;
+ }
+
+ serr = SKB_EXT_ERR(skb);
+
+ memcpy(&errhdr.ee, &serr->ee, sizeof(struct sock_extended_err));
+
+ put_cmsg(msg, SOL_IP, IP_RECVERR, sizeof(errhdr), &errhdr);
+
+ msg->msg_flags |= MSG_ERRQUEUE;
+ err = copied;
+
+ consume_skb(skb);
+out:
+ return err;
+}
+
static int unix_stream_read_actor(struct sk_buff *skb,
int skip, int chunk,
struct unix_stream_read_state *state)
@@ -2752,6 +2819,10 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
return prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
flags & ~MSG_DONTWAIT, NULL);
#endif
+
+ if (unlikely(flags & MSG_ERRQUEUE))
+ return unix_recv_error(sk, msg, size);
+
return unix_stream_read_generic(&state, true);
}
@@ -2759,6 +2830,15 @@ static int unix_stream_splice_actor(struct sk_buff *skb,
int skip, int chunk,
struct unix_stream_read_state *state)
{
+ /* Zerocopy pages cannot be spliced, alas. It looks like splice interface
+ * gives no way to notify about actual page consumption. So, we have to copy.
+ * This path is not going be legit, sender will be notified and will stop zerocopying.
+ */
+ int err = skb_orphan_frags_rx(skb, GFP_KERNEL);
+
+ if (err)
+ return err;
+
return skb_splice_bits(skb, state->socket->sk,
UNIXCB(skb).consumed + skip,
state->pipe, chunk, state->splice_flags);
@@ -2945,7 +3025,7 @@ static __poll_t unix_poll(struct file *file, struct socket *sock, poll_table *wa
mask = 0;
/* exceptional events? */
- if (sk->sk_err)
+ if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
mask |= EPOLLERR;
if (sk->sk_shutdown == SHUTDOWN_MASK)
mask |= EPOLLHUP;
More information about the Devel
mailing list