[Devel] [PATCH RHEL7 COMMIT] fuse kio: Skip processing of messages above PAGE_SIZE size

Konstantin Khorenko khorenko at virtuozzo.com
Mon Jun 25 19:04:24 MSK 2018


The commit is pushed to "branch-rh7-3.10.0-693.21.1.vz7.50.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-693.21.1.vz7.50.15
------>
commit d68a2b457e9e0f06c36c8df73dcfcd4c29c15ca8
Author: Kirill Tkhai <ktkhai at virtuozzo.com>
Date:   Mon Jun 25 19:04:24 2018 +0300

    fuse kio: Skip processing of messages above PAGE_SIZE size
    
    This patch introduces new PCS_FAKE_MSG to encode messages,
    which are going to be dropped. Since their content is not
    needed for us, and the content must be read from the socket,
    we use a "trash buffer" just to be able to do do_sock_recv().
    
    https://pmc.acronis.com/browse/VSTOR-11208
    
    Signed-off-by: Kirill Tkhai <ktkhai at virtuozzo.com>
    
    ============================================
    Patchset description:
    fuse kio: Add trash message handler
    
    This patchset adds a possibility to handle big messages,
    which we are going to drop, in the special way.
    It makes the code to avoid message allocation. Instead of this,
    we read messages in trash buffer, and this action removes
    the trash message data from socket.
    
    All the changes in pcs_sockio_recv() are made in generic way
    via helpers, and the view of the branches and algorhythm remains
    the same.
    
    https://pmc.acronis.com/browse/VSTOR-11208
    
    Kirill Tkhai (2):
          fuse kio: Get msg size in second argument of pcs_sockio::get_msg()
          fuse kio: Skip processing of messages above PAGE_SIZE size
---
 fs/fuse/kio/pcs/pcs_rpc.c     |  5 +++++
 fs/fuse/kio/pcs/pcs_sock_io.c | 47 +++++++++++++++++++++++++++++++++++++------
 fs/fuse/kio/pcs/pcs_sock_io.h |  1 +
 3 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index 3294f6cdf360..04711ccee5fd 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -528,6 +528,11 @@ struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size)
 
 	}
 
+	if (h->len > PAGE_SIZE) {
+		pcs_log(0, "Received too big msg  %u\n", h->len);
+		return PCS_TRASH_MSG;
+	}
+
 	msg = pcs_rpc_alloc_input_msg(ep, h->len);
 	if (!msg) {
 		pcs_sock_throttle(sio);
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
index 907261400b36..a1798a8c09fc 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.c
+++ b/fs/fuse/kio/pcs/pcs_sock_io.c
@@ -105,6 +105,37 @@ void pcs_sock_error(struct pcs_sockio * sio, int error)
 	sio_abort(sio, error);
 }
 
+static char trash_buf[PAGE_SIZE];
+
+static void rcv_get_iter(struct pcs_msg *msg, int read_off, struct iov_iter *it)
+{
+	if (likely(msg != PCS_TRASH_MSG))
+		msg->get_iter(msg, read_off, it);
+}
+
+static struct page *rcv_iov_iter_kmap(struct pcs_msg *msg, struct iov_iter *it,
+				      void **buf, size_t *len)
+{
+	if (unlikely(msg == PCS_TRASH_MSG)) {
+		*buf = trash_buf;
+		*len = sizeof(trash_buf);
+		return NULL;
+	}
+
+	return iov_iter_kmap(it, buf, len);
+}
+
+static void rcv_iov_iter_advance(struct pcs_msg *msg, struct iov_iter *it, int n)
+{
+	if (likely(msg != PCS_TRASH_MSG))
+		iov_iter_advance(it, n);
+}
+
+static void rcv_msg_done(struct pcs_msg *msg)
+{
+	if (likely(msg != PCS_TRASH_MSG))
+		msg->done(msg);
+}
 static int do_send_one_seg(struct socket *sock, struct iov_iter *it, bool more)
 {
 	int ret;
@@ -198,7 +229,7 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
 				sio->hdr_ptr = 0;
 				sio->current_msg = msg;
 				sio->current_msg_size = msg_size;
-				msg->get_iter(msg, sio->read_offset, it);
+				rcv_get_iter(msg, sio->read_offset, it);
 				TRACE(PEER_FMT" msg:%p read_off:%d iov_size:%ld\n", PEER_ARGS(ep), msg, sio->read_offset,
 				      iov_iter_count(it));
 			} else {
@@ -219,15 +250,16 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
 
 				if (!iov_iter_count(it))
 					/* Current iter is exhausted, init new one */
-					msg->get_iter(msg, sio->read_offset, it);
+					rcv_get_iter(msg, sio->read_offset, it);
 
 				TRACE(PEER_FMT" msg:%p->size:%d off:%d it_count:%ld\n",
 				      PEER_ARGS(ep), msg, msg_size, sio->read_offset,
 				      iov_iter_count(it));
 
-				BUG_ON(iov_iter_count(it) > msg_size - sio->read_offset);
+				if (msg != PCS_TRASH_MSG)
+					BUG_ON(iov_iter_count(it) > msg_size - sio->read_offset);
 
-				page = iov_iter_kmap(it, &buf, &len);
+				page = rcv_iov_iter_kmap(msg, it, &buf, &len);
 				if (len > msg_size - sio->read_offset)
 					len = msg_size - sio->read_offset;
 				n = do_sock_recv(conn->socket, buf, len);
@@ -236,7 +268,7 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
 
 				if (n > 0) {
 					sio->read_offset += n;
-					iov_iter_advance(it, n);
+					rcv_iov_iter_advance(msg, it, n);
 				} else {
 					if (n == -EAGAIN || n == 0)
 						return;
@@ -246,7 +278,7 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
 			}
 			sio->current_msg = NULL;
 			iov_iter_init_bad(&sio->read_iter);
-			msg->done(msg);
+			rcv_msg_done(msg);
 			if (++count >= PCS_SIO_PREEMPT_LIMIT ||
 			    time_is_before_jiffies(loop_timeout)) {
 				sio->flags |= PCS_SOCK_F_POOLIN;
@@ -602,6 +634,9 @@ struct pcs_msg * pcs_alloc_output_msg(int datalen)
 
 void pcs_free_msg(struct pcs_msg * msg)
 {
+	if (msg == PCS_TRASH_MSG)
+		return;
+
 	pcs_msg_io_fini(msg);
 
 	if (msg->destructor)
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
index 83f5ca356707..e2fceee46383 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.h
+++ b/fs/fuse/kio/pcs/pcs_sock_io.h
@@ -126,6 +126,7 @@ struct pcs_sockio
 	u32			retrans;
 
 	struct pcs_msg		*current_msg;
+#define PCS_TRASH_MSG ((void *)~0UL)
 	u32			current_msg_size;
 	int			read_offset;
 	int			write_offset;


More information about the Devel mailing list