[Devel] [PATCH RHEL8 COMMIT] fs/fuse kio: make pcs rpc socket independent

Konstantin Khorenko khorenko at virtuozzo.com
Thu Oct 15 10:37:34 MSK 2020


The commit is pushed to "branch-rh8-4.18.0-193.6.3.vz8.4.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh8-4.18.0-193.6.3.vz8.4.13
------>
commit 2bf7282221e5a19fcf3b71799d7dd19a207f8f8d
Author: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
Date:   Thu Oct 15 10:37:34 2020 +0300

    fs/fuse kio: make pcs rpc socket independent
    
    This is important step towards implementation of RDMA transport.
    Direct references to socket transport are replaced with an
    abstract network transport.
    
    https://pmc.acronis.com/browse/VSTOR-4310
    
    Signed-off-by: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
    Signed-off-by: Ildar Ismagilov <Ildar.Ismagilov at acronis.com>
---
 fs/fuse/kio/pcs/pcs_cs.c        |   6 +-
 fs/fuse/kio/pcs/pcs_ioconn.h    |  11 +++
 fs/fuse/kio/pcs/pcs_net.h       |  56 ++++++++++++
 fs/fuse/kio/pcs/pcs_rpc.c       |  70 ++++++++-------
 fs/fuse/kio/pcs/pcs_rpc.h       |   6 +-
 fs/fuse/kio/pcs/pcs_sock_conn.c |  30 ++++---
 fs/fuse/kio/pcs/pcs_sock_io.c   | 191 ++++++++++++++++++++++------------------
 fs/fuse/kio/pcs/pcs_sock_io.h   |  55 +++---------
 8 files changed, 243 insertions(+), 182 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 7ecd54fcf494..57b07df00f55 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -437,6 +437,8 @@ static void cs_get_read_response_iter(struct pcs_msg *msg, int offset, struct io
 
 static void cs_connect(struct pcs_rpc *ep)
 {
+	void (*connect_start)(struct pcs_rpc *);
+
 	if (ep->flags & PCS_RPC_F_LOCAL) {
 		char path[128];
 
@@ -453,6 +455,7 @@ static void cs_connect(struct pcs_rpc *ep)
 		ep->sh.sun.sun_family = AF_UNIX;
 		ep->sh.sa_len = sizeof(struct sockaddr_un);
 		strcpy(ep->sh.sun.sun_path, path);
+		connect_start = pcs_sockconnect_start;
 	} else {
 		/* TODO: print sock addr using pcs_format_netaddr() */
 		if (ep->addr.type != PCS_ADDRTYPE_RDMA) {
@@ -460,6 +463,7 @@ static void cs_connect(struct pcs_rpc *ep)
 				TRACE("netaddr to sockaddr failed");
 				goto fail;
 			}
+			connect_start = pcs_sockconnect_start;
 		} else {
 			WARN_ON_ONCE(1);
 			/* TODO: rdma connect init */
@@ -467,7 +471,7 @@ static void cs_connect(struct pcs_rpc *ep)
 		}
 	}
 	ep->state = PCS_RPC_CONNECT;
-	pcs_sockconnect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
+	connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
 	return;
 fail:
 	pcs_rpc_reset(ep);
diff --git a/fs/fuse/kio/pcs/pcs_ioconn.h b/fs/fuse/kio/pcs/pcs_ioconn.h
new file mode 100644
index 000000000000..93806e273a73
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_ioconn.h
@@ -0,0 +1,11 @@
+#ifndef _PCS_IOCONN_H_
+#define _PCS_IOCONN_H_ 1
+
+#include "pcs_types.h"
+
+struct pcs_ioconn
+{
+	void(*destruct)(struct pcs_ioconn *); /* called in pcs_ioconn_unregister() */
+};
+
+#endif /* _PCS_IOCONN_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_net.h b/fs/fuse/kio/pcs/pcs_net.h
new file mode 100644
index 000000000000..64a20be482f3
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_net.h
@@ -0,0 +1,56 @@
+#ifndef _PCS_NET_H_
+#define _PCS_NET_H_ 1
+
+#include "pcs_types.h"
+#include "pcs_ioconn.h"
+
+struct pcs_msg;
+struct pcs_netio;
+struct pcs_rpc;
+
+struct pcs_netio_tops {
+	/* suspend polling events on netio->ioconn.fd */
+	void  (*throttle)(struct pcs_netio *netio);
+
+	/* resume polling events on netio->ioconn.fd */
+	void  (*unthrottle)(struct pcs_netio *netio);
+
+	/* queue message for sending */
+	void  (*send_msg)(struct pcs_netio *netio, struct pcs_msg *msg);
+
+	/* try to cancel message send */
+	int   (*cancel_msg)(struct pcs_msg *msg);
+
+	/* tear down connection, finilize all in-flight messages with error */
+	void  (*abort_io)(struct pcs_netio *netio, int error);
+
+	/* try to transmit messages */
+	void  (*xmit)(struct pcs_netio *netio);
+
+	/* try to flush messages */
+	int   (*flush)(struct pcs_netio *netio);
+
+	/* get next timeout */
+	unsigned long (*next_timeout)(struct pcs_netio *netio);
+};
+
+struct pcs_netio {
+	struct pcs_ioconn ioconn;
+	struct pcs_rpc *parent;
+
+	/* transport methods */
+	struct pcs_netio_tops *tops;
+
+	/* callbacks */
+
+	/* create pcs_msg by inline_buffer pointing to the head of new incoming message */
+	struct pcs_msg *(*getmsg)(struct pcs_netio *netio, char *inline_buffer,
+				  u32 *msg_size);
+
+	/* report "connection closed" event: graceful shutdown or abort_io. Notice, that
+	 * the handler could be called twice: once on graceful shutdown and from abort_io()
+	 */
+	void  (*eof)(struct pcs_netio *netio);
+};
+
+#endif /* _PCS_NET_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index d2ad9537418a..114e6bf59e7c 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -23,7 +23,6 @@
 
 
 #include "pcs_types.h"
-#include "pcs_sock_io.h"
 #include "pcs_rpc.h"
 #include "pcs_cluster.h"
 #include "log.h"
@@ -202,14 +201,17 @@ void rpc_abort(struct pcs_rpc * ep, int fatal, int error)
 
 	if (ep->conn) {
 		struct pcs_ioconn * ioconn = ep->conn;
-		struct pcs_sockio * sio = sio_from_ioconn(ioconn);
 
 		ep->conn = NULL;
 		if (ep->gc)
 			list_lru_del(&ep->gc->lru, &ep->lru_link);
 
-		sio->eof = NULL;
-		pcs_sock_error(sio, error);
+		/* TODO: Add support of PCS_RPC_CONNECT state */
+		if (state != PCS_RPC_CONNECT) {
+			struct pcs_netio *netio = (struct pcs_netio *)ioconn;
+			netio->tops->abort_io(netio, error);
+		}
+
 		if (ioconn->destruct)
 			ioconn->destruct(ioconn);
 	}
@@ -357,9 +359,9 @@ void __pcs_rpc_put(struct pcs_rpc *ep)
 		queue_work(pcs_cleanup_wq, &rpc_cleanup_work);
 }
 
-void rpc_eof_cb(struct pcs_sockio * sio)
+void rpc_eof_cb(struct pcs_netio * netio)
 {
-	struct pcs_rpc * ep = sio->parent;
+	struct pcs_rpc * ep = netio->parent;
 
 	if (WARN_ON_ONCE(ep == NULL))
 		return;
@@ -367,7 +369,7 @@ void rpc_eof_cb(struct pcs_sockio * sio)
 	/* Dead socket is finally closed, we could already open another one.
 	 * I feel inconvenient about this.
 	 */
-	if (&sio->ioconn != ep->conn)
+	if (&netio->ioconn != ep->conn)
 		return;
 
 	rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
@@ -403,8 +405,8 @@ void pcs_rpc_error_respond(struct pcs_rpc * ep, struct pcs_msg * msg, int err)
 
 	eresp = pcs_rpc_alloc_error_response(ep, h, err, sizeof(struct pcs_rpc_error_resp));
 	if (eresp) {
-		struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
-		pcs_sock_sendmsg(sio, eresp);
+		struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+		netio->tops->send_msg(netio, eresp);
 	}
 }
 
@@ -555,10 +557,10 @@ void rpc_work_input(struct pcs_msg * msg)
 	pcs_free_msg(msg);
 }
 
-struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size)
+struct pcs_msg *rpc_get_hdr(struct pcs_netio * netio, char *inline_buffer, u32 *msg_size)
 {
-	struct pcs_rpc * ep = sio->parent;
-	struct pcs_rpc_hdr * h = (struct pcs_rpc_hdr*)sio_inline_buffer(sio);
+	struct pcs_rpc * ep = netio->parent;
+	struct pcs_rpc_hdr * h = (struct pcs_rpc_hdr*)inline_buffer;
 	struct pcs_msg * msg;
 	void (*next_input)(struct pcs_msg *);
 
@@ -594,7 +596,7 @@ struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size)
 
 	msg = pcs_rpc_alloc_input_msg(ep, h->len);
 	if (!msg) {
-		pcs_sock_throttle(sio);
+		netio->tops->throttle(netio);
 		return NULL;
 	}
 
@@ -628,8 +630,6 @@ void pcs_rpc_connect(struct pcs_rpc * ep)
  */
 static void pcs_rpc_send(struct pcs_rpc * ep, struct pcs_msg * msg, bool requeue)
 {
-	struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
-
 	BUG_ON(!mutex_is_locked(&ep->mutex));
 	BUG_ON(msg->rpc != (requeue ? ep: NULL));
 
@@ -655,9 +655,10 @@ static void pcs_rpc_send(struct pcs_rpc * ep, struct pcs_msg * msg, bool requeue
 
 	if (ep->state == PCS_RPC_WORK) {
 		BUG_ON(ep->conn == NULL);
-		if (msg->size)
-			pcs_sock_sendmsg(sio, msg);
-		else {
+		if (msg->size) {
+			struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+			netio->tops->send_msg(netio, msg);
+		} else {
 			pcs_msg_del_calendar(msg);
 			msg->done(msg);
 		}
@@ -760,7 +761,7 @@ static void calendar_work(struct work_struct *w)
 		pcs_msg_del_calendar(msg);
 		switch (msg->stage) {
 		case PCS_MSG_STAGE_SEND:
-			if (pcs_sock_cancel_msg(msg)) {
+			if (msg->netio->tops->cancel_msg(msg)) {
 				/* The message is under network IO right now. We cannot kill it
 				 * without destruction of the whole connection. So, we just reschedule
 				 * kill. When IO will complete, it will be killed not even waiting
@@ -809,7 +810,7 @@ static void calendar_work(struct work_struct *w)
 
 static void update_xmit_timeout(struct pcs_rpc *ep)
 {
-	struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
+	struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
 	struct pcs_cluster_core *cc = cc_from_rpc(ep->eng);
 	struct pcs_msg * msg;
 	unsigned long timeout = 0;
@@ -817,7 +818,7 @@ static void update_xmit_timeout(struct pcs_rpc *ep)
 
 	BUG_ON(ep->state != PCS_RPC_WORK);
 
-	if (list_empty(&ep->pending_queue) && list_empty(&sio->write_queue)) {
+	if (list_empty(&ep->pending_queue) && !netio->tops->next_timeout(netio)) {
 		if (timer_pending(&ep->timer_work.timer))
 			cancel_delayed_work(&ep->timer_work);
 		return;
@@ -827,9 +828,8 @@ static void update_xmit_timeout(struct pcs_rpc *ep)
 
 		timeout = msg->start_time + ep->params.response_timeout;
 	}
-	if (!list_empty(&sio->write_queue)) {
-		msg = list_first_entry(&sio->write_queue, struct pcs_msg, list);
-		tx = msg->start_time + sio->send_timeout;
+	if (netio->tops->next_timeout(netio)) {
+		tx = netio->tops->next_timeout(netio);
 		if (time_after(tx, timeout))
 			timeout = tx;
 	}
@@ -839,7 +839,6 @@ static void update_xmit_timeout(struct pcs_rpc *ep)
 		timeout -= jiffies;
 
 	mod_delayed_work(cc->wq, &ep->timer_work, timeout);
-
 }
 
 static void rpc_queue_work(struct work_struct *w)
@@ -861,9 +860,8 @@ static void rpc_queue_work(struct work_struct *w)
 
 	/* Process messages which are already in the sock queue */
 	if (ep->state == PCS_RPC_WORK) {
-		struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
-
-		pcs_sockio_xmit(sio);
+		struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+		netio->tops->xmit(netio);
 	}
 
 	/* Process delayed ones */
@@ -884,14 +882,16 @@ static void rpc_queue_work(struct work_struct *w)
 		list_del_init(&msg->list);
 		pcs_rpc_send(ep, msg, 1);
 	}
+
 	repeat = 0;
 	if (ep->state == PCS_RPC_WORK) {
-		struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
-
-		if (pcs_sockio_delayed_seg(sio))
+		struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+		if (netio->tops->flush(netio))
 			repeat = 1;
-		update_xmit_timeout(ep);
 	}
+	if (ep->state == PCS_RPC_WORK)
+		update_xmit_timeout(ep);
+
 	mutex_unlock(&ep->mutex);
 	if (repeat)
 		goto again;
@@ -1100,8 +1100,10 @@ void pcs_rpc_deaccount_msg(struct pcs_msg * msg)
 		if (ep->accounted == 0)
 			ep->eng->accounted_rpcs--;
 		msg->accounted = 0;
-		if (ep->state == PCS_RPC_WORK)
-			pcs_sock_unthrottle((struct pcs_sockio *)ep->conn);
+		if (ep->state == PCS_RPC_WORK) {
+			struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+			netio->tops->unthrottle(netio);
+		}
 	}
 	pcs_rpc_put(ep);
 }
diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
index ffa08cce7a18..1f3371a891dc 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.h
+++ b/fs/fuse/kio/pcs/pcs_rpc.h
@@ -3,6 +3,7 @@
 
 //#include "pcs_defs.h"
 #include "pcs_rpc_prot.h"
+#include "pcs_net.h"
 #include "pcs_sock_io.h"
 
 struct pcs_msg;
@@ -285,7 +286,7 @@ void pcs_rpc_init_response(struct pcs_msg * msg, struct pcs_rpc_hdr * req_hdr, i
 
 /* Allocate message and initialize header */
 struct pcs_msg * pcs_rpc_alloc_msg_w_hdr(int type, int size);
-struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size);
+struct pcs_msg *rpc_get_hdr(struct pcs_netio * netio, char *inline_buffer, u32 *msg_size);
 
 void pcs_rpc_set_memlimits(struct pcs_rpc_engine * eng, u64 thresh, u64 limit);
 void pcs_rpc_account_adjust(struct pcs_msg * msg, int adjustment);
@@ -298,9 +299,8 @@ int pcs_cluster_id_eq(PCS_CLUSTER_ID_T *id1, PCS_CLUSTER_ID_T *id2);
 
 void rpc_trace_health(struct pcs_rpc * ep);
 void pcs_rpc_enumerate_rpc(struct pcs_rpc_engine *eng, void (*cb)(struct pcs_rpc *ep, void *arg), void *arg);
-void pcs_rpc_set_sock(struct pcs_rpc *ep, struct pcs_sockio * sio);
 void pcs_rpc_enable(struct pcs_rpc * ep, int error);
-void rpc_eof_cb(struct pcs_sockio *sio);
+void rpc_eof_cb(struct pcs_netio *netio);
 
 static inline struct pcs_rpc *pcs_rpc_from_work(struct work_struct *wr)
 {
diff --git a/fs/fuse/kio/pcs/pcs_sock_conn.c b/fs/fuse/kio/pcs/pcs_sock_conn.c
index 039fcf80dea6..83043177fb92 100644
--- a/fs/fuse/kio/pcs/pcs_sock_conn.c
+++ b/fs/fuse/kio/pcs/pcs_sock_conn.c
@@ -327,6 +327,7 @@ struct pcs_rpc_auth
 static int send_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
 {
 	struct pcs_rpc_engine *eng = ep->eng;
+	struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
 	struct pcs_rpc_auth *au;
 	size_t msg_sz = sizeof(struct pcs_rpc_auth) +
 			round_up(size, PCS_RPC_AUTH_PAYLOAD_ALIGN) +
@@ -377,7 +378,7 @@ static int send_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
 	TRACE("state=%d, type=%d, len=%d, msg_sz: %lu",
 	      au->state, au->payload.type, au->payload.len, msg_sz);
 
-	err = send_buf(ep->conn->socket, (u8*)au, msg_sz);
+	err = send_buf(sio->socket, (u8*)au, msg_sz);
 	if (err)
 		TRACE("Can't send au msg, err: %d", err);
 	pcs_free_msg(msg);
@@ -387,6 +388,7 @@ static int send_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
 
 static int recv_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
 {
+	struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
 	struct pcs_rpc_auth *au;
 	size_t fixed_sz = sizeof(struct pcs_rpc_auth) +
 			  round_up(size, PCS_RPC_AUTH_PAYLOAD_ALIGN);
@@ -399,7 +401,7 @@ static int recv_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
 		return -ENOMEM;
 	}
 
-	err = recv_buf(ep->conn->socket, msg->_inline_buffer, fixed_sz);
+	err = recv_buf(sio->socket, msg->_inline_buffer, fixed_sz);
 	if (err) {
 		TRACE("Can't recv auth msg(%d), err: %lu", err, fixed_sz);
 		goto fail;
@@ -440,7 +442,7 @@ static int recv_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
 		size_t rest_sz = au->hdr.len - fixed_sz;
 		while (rest_sz) {
 			size_t recv_sz = min(fixed_sz, rest_sz);
-			err = recv_buf(ep->conn->socket, msg->_inline_buffer,
+			err = recv_buf(sio->socket, msg->_inline_buffer,
 				       recv_sz);
 			if (err) {
 				TRACE("Can't recv auth msg(%d), err: %lu",
@@ -591,7 +593,6 @@ void pcs_sockconnect_start(struct pcs_rpc *ep)
 	iov_iter_kvec(&sio->write_iter, WRITE, NULL, 0, 0);
 	sio->hdr_max = sizeof(struct pcs_rpc_hdr);
 	sio->flags = sa->sa_family != AF_UNIX ? PCS_SOCK_F_CORK : 0;
-	INIT_LIST_HEAD(&sio->ioconn.list);
 
 	err = sock_create(sa->sa_family, SOCK_STREAM, 0, &sock);
 	if (err < 0) {
@@ -616,13 +617,14 @@ void pcs_sockconnect_start(struct pcs_rpc *ep)
 	cancel_delayed_work(&ep->timer_work);
 	ep->retries++;
 
-	ep->conn = &sio->ioconn;
-	sio->parent = pcs_rpc_get(ep);
-	sio->get_msg = rpc_get_hdr;
-	sio->eof = rpc_eof_cb;
+	ep->conn = &sio->netio.ioconn;
 	sio->send_timeout = PCS_SIO_TIMEOUT;
-	sio->ioconn.socket = sock;
-	sio->ioconn.destruct = pcs_sock_ioconn_destruct;
+	sio->socket = sock;
+	sio->netio.ioconn.destruct = pcs_sock_ioconn_destruct;
+	sio->netio.parent = pcs_rpc_get(ep);
+	sio->netio.tops = &pcs_sock_netio_tops;
+	sio->netio.getmsg = rpc_get_hdr;
+	sio->netio.eof = rpc_eof_cb;
 	if (ep->gc)
 		list_lru_add(&ep->gc->lru, &ep->lru_link);
 
@@ -646,10 +648,10 @@ void pcs_sockconnect_start(struct pcs_rpc *ep)
 	 * since this seems to be able to result in performance.
 	 */
 	WARN_ON_ONCE(sock->sk->sk_user_data);
-	sio->ioconn.orig.user_data = sock->sk->sk_user_data;
-	sio->ioconn.orig.data_ready = sock->sk->sk_data_ready;
-	sio->ioconn.orig.write_space = sock->sk->sk_write_space;
-	sio->ioconn.orig.error_report = sock->sk->sk_error_report;
+	sio->orig.user_data = sock->sk->sk_user_data;
+	sio->orig.data_ready = sock->sk->sk_data_ready;
+	sio->orig.write_space = sock->sk->sk_write_space;
+	sio->orig.error_report = sock->sk->sk_error_report;
 
 	sock->sk->sk_sndtimeo = PCS_SIO_TIMEOUT;
 	sock->sk->sk_allocation = GFP_NOFS;
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
index 4191943540a3..33feb33ecec0 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.c
+++ b/fs/fuse/kio/pcs/pcs_sock_io.c
@@ -12,7 +12,7 @@
 #include "log.h"
 
 
-static void sio_msg_sent(struct pcs_msg * msg)
+void pcs_msg_sent(struct pcs_msg * msg)
 {
 	msg->stage = PCS_MSG_STAGE_SENT;
 	if (msg->timeout) {
@@ -22,13 +22,13 @@ static void sio_msg_sent(struct pcs_msg * msg)
 	}
 }
 
-void sio_push(struct pcs_sockio * sio)
+static void sio_push(struct pcs_sockio * sio)
 {
-	TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->parent));
+	TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->netio.parent));
 	if (sio->flags & PCS_SOCK_F_CORK) {
 		int optval = 1;
 		int ret;
-		ret = kernel_setsockopt(sio->ioconn.socket, SOL_TCP, TCP_NODELAY,
+		ret = kernel_setsockopt(sio->socket, SOL_TCP, TCP_NODELAY,
 					(char *)&optval, sizeof(optval));
 		if (ret)
 			TRACE("kernel_setsockopt(TCP_NODELAY) failed: %d",  ret);
@@ -36,18 +36,18 @@ void sio_push(struct pcs_sockio * sio)
 	}
 }
 
-static void pcs_ioconn_unregister(struct pcs_ioconn *ioconn)
+static void pcs_ioconn_unregister(struct pcs_sockio *sio)
 {
-	if (!test_bit(PCS_IOCONN_BF_DEAD, &ioconn->flags))
-		set_bit(PCS_IOCONN_BF_DEAD, &ioconn->flags);
+	if (!test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags))
+		set_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags);
 }
 
-static void pcs_ioconn_close(struct pcs_ioconn *ioconn)
+static void pcs_ioconn_close(struct pcs_sockio *sio)
 {
-	kernel_sock_shutdown(ioconn->socket, SHUT_RDWR);
+	kernel_sock_shutdown(sio->socket, SHUT_RDWR);
 }
 
-void sio_abort(struct pcs_sockio * sio, int error)
+static void sio_abort(struct pcs_sockio * sio, int error)
 {
 	if (sio->current_msg) {
 		pcs_free_msg(sio->current_msg);
@@ -59,32 +59,23 @@ void sio_abort(struct pcs_sockio * sio, int error)
 		struct pcs_msg * msg = list_first_entry(&sio->write_queue, struct pcs_msg, list);
 		list_del(&msg->list);
 		sio->write_queue_len -= msg->size;
-		sio_msg_sent(msg);
+		pcs_msg_sent(msg);
 
 		pcs_set_local_error(&msg->error, error);
 		BUG_ON(!hlist_unhashed(&msg->kill_link));
 		msg->done(msg);
 	}
-	pcs_ioconn_unregister(&sio->ioconn);
-	pcs_ioconn_close(&sio->ioconn);
+	pcs_ioconn_unregister(sio);
+	pcs_ioconn_close(sio);
 	pcs_set_local_error(&sio->error, error);
-	if (sio->eof) {
-		void (*eof)(struct pcs_sockio *) = sio->eof;
-		sio->eof = NULL;
-		(*eof)(sio);
+	if (sio->netio.eof) {
+		void (*eof)(struct pcs_netio *) = sio->netio.eof;
+		sio->netio.eof = NULL;
+		(*eof)(&sio->netio);
 	}
 }
 
-
-void pcs_sock_abort(struct pcs_sockio * sio)
-{
-	if (!sio)
-		return;
-
-	sio_abort(sio, PCS_ERR_NET_ABORT);
-}
-
-void pcs_sock_error(struct pcs_sockio * sio, int error)
+static void pcs_sock_error(struct pcs_sockio * sio, int error)
 {
 	sio_abort(sio, error);
 }
@@ -195,7 +186,7 @@ static int do_sock_recv(struct socket *sock, void *buf, size_t len)
 	rcu_read_lock();
 	sio = rcu_dereference_sk_user_data(sock->sk);
 	if (sio) {
-		TRACE("RET: "PEER_FMT" len:%ld ret:%d\n", PEER_ARGS(sio->parent),
+		TRACE("RET: "PEER_FMT" len:%ld ret:%d\n", PEER_ARGS(sio->netio.parent),
 		      len, ret);
 	}
 	rcu_read_unlock();
@@ -212,20 +203,19 @@ static int do_sock_recv_callback(struct kvec *vec, void *context)
 
 static void pcs_sockio_recv(struct pcs_sockio *sio)
 {
-	struct pcs_ioconn* conn = &sio->ioconn;
 	struct iov_iter *it = &sio->read_iter;
-	struct pcs_rpc *ep = sio->parent;
+	struct pcs_rpc *ep = sio->netio.parent;
 	int count = 0;
 	u32 msg_size;
 	unsigned long loop_timeout = jiffies + PCS_SIO_SLICE;
 
 	TRACE("ENTER:" PEER_FMT " sio:%p cur_msg:%p\n", PEER_ARGS(ep), sio, sio->current_msg);
 
-	while(!test_bit(PCS_IOCONN_BF_DEAD, &conn->flags)) {
+	while(!test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags)) {
 		int n;
 		struct pcs_msg * msg;
 
-		if (test_bit(PCS_IOCONN_BF_ERROR, &conn->flags)) {
+		if (test_bit(PCS_IOCONN_BF_ERROR, &sio->io_flags)) {
 			sio_abort(sio, PCS_ERR_NET_ABORT);
 			return;
 		}
@@ -238,14 +228,14 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
 			n = 0;
 
 			if (copy)
-				n = do_sock_recv(conn->socket, (char *)sio_inline_buffer(sio) + sio->hdr_ptr, copy);
+				n = do_sock_recv(sio->socket, (char *)sio_inline_buffer(sio) + sio->hdr_ptr, copy);
 
 			if (n > 0 || n == copy /* recv return 0 when copy is 0 */) {
 				sio->hdr_ptr += n;
 				if(sio->hdr_ptr != sio->hdr_max)
 					return;
 
-				msg = sio->get_msg(sio, &msg_size);
+				msg = sio->netio.getmsg(&sio->netio, sio_inline_buffer(sio), &msg_size);
 				if (msg == NULL) {
 					if (sio->hdr_ptr < sio->hdr_max)
 						continue;
@@ -286,7 +276,7 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
 
 				n = iov_iter_for_each_range(it, min(iov_iter_single_seg_count(it),
 							(size_t)(msg_size - sio->read_offset)), do_sock_recv_callback,
-						conn->socket);
+						sio->socket);
 				if (n > 0) {
 					sio->read_offset += n;
 					iov_iter_advance(it, n);
@@ -314,8 +304,7 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
 
 static void pcs_sockio_send(struct pcs_sockio *sio)
 {
-	struct pcs_rpc *ep __maybe_unused = sio->parent;
-	struct pcs_ioconn* conn = &sio->ioconn;
+	struct pcs_rpc *ep __maybe_unused = sio->netio.parent;
 	struct iov_iter *it = &sio->write_iter;
 	unsigned long loop_timeout = jiffies + PCS_SIO_SLICE;
 	struct pcs_msg * msg;
@@ -329,14 +318,14 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
 
 		/* This is original check, but it is not clear how connection can becomes
 		   dead before sio_abort() was called. Let's simplify it with BUG_ON
-		if (conn->dead) {
+		if (sio->dead) {
 			pcs_set_local_error(&msg->error, PCS_ERR_NET_ABORT);
 			goto done;
 		}
 		*/
-		BUG_ON(test_bit(PCS_IOCONN_BF_DEAD, &conn->flags));
+		BUG_ON(test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags));
 
-		if (test_bit(PCS_IOCONN_BF_ERROR, &conn->flags)) {
+		if (test_bit(PCS_IOCONN_BF_ERROR, &sio->io_flags)) {
 			sio_abort(sio, PCS_ERR_NET_ABORT);
 			return;
 		}
@@ -354,8 +343,8 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
 				msg->get_iter(msg, sio->write_offset, it, WRITE);
 			}
 			BUG_ON(iov_iter_count(it) > left);
-			n = do_send_one_seg(conn->socket, it, iov_iter_single_seg_count(it),
-					iov_iter_single_seg_count(it) < left);
+			n = do_send_one_seg(sio->socket, it, iov_iter_single_seg_count(it),
+					    iov_iter_single_seg_count(it) < left);
 			if (n > 0) {
 				sio->write_offset += n;
 				iov_iter_advance(it, n);
@@ -383,7 +372,7 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
 		}
 		sio->write_offset = 0;
 		iov_iter_kvec(it, WRITE, NULL, 0, 0);
-		sio_msg_sent(msg);
+		pcs_msg_sent(msg);
 		msg->done(msg);
 		if (++count >= PCS_SIO_PREEMPT_LIMIT ||
 		    time_is_before_jiffies(loop_timeout)) {
@@ -395,9 +384,10 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
 		sio_push(sio);
 }
 
-void pcs_sockio_xmit(struct pcs_sockio *sio)
+static void pcs_sockio_xmit(struct pcs_netio *netio)
 {
-	struct pcs_rpc *ep = sio->parent;
+	struct pcs_sockio *sio = sio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
 
 	BUG_ON(!mutex_is_locked(&ep->mutex));
 
@@ -406,13 +396,15 @@ void pcs_sockio_xmit(struct pcs_sockio *sio)
 	pcs_sockio_send(sio);
 }
 
-int pcs_sockio_delayed_seg(struct pcs_sockio *sio)
+static int pcs_sockio_flush(struct pcs_netio *netio)
 {
+	struct pcs_sockio *sio = sio_from_netio(netio);
 	return sio->flags & (PCS_SOCK_F_POOLOUT|PCS_SOCK_F_POOLIN);
 }
 
-void pcs_sock_sendmsg(struct pcs_sockio * sio, struct pcs_msg *msg)
+static void pcs_sock_sendmsg(struct pcs_netio *netio, struct pcs_msg *msg)
 {
+	struct pcs_sockio *sio = sio_from_netio(netio);
 	int was_idle = list_empty(&sio->write_queue);
 
 	DTRACE("sio(%p) msg:%p\n", sio, msg);
@@ -422,7 +414,7 @@ void pcs_sock_sendmsg(struct pcs_sockio * sio, struct pcs_msg *msg)
 		msg->done(msg);
 		return;
 	}
-	msg->sio = sio;
+	msg->netio = &sio->netio;
 
 	list_add_tail(&msg->list, &sio->write_queue);
 	sio->write_queue_len += msg->size;
@@ -441,11 +433,11 @@ void pcs_sock_sendmsg(struct pcs_sockio * sio, struct pcs_msg *msg)
 /* Try to cancel message send. If it is impossible, because message is in the middle
  * of write, so nothing and return an error.
  */
-int pcs_sock_cancel_msg(struct pcs_msg * msg)
+static int pcs_sock_cancel_msg(struct pcs_msg * msg)
 {
-	struct pcs_sockio * sio = msg->sio;
+	struct pcs_sockio * sio = sio_from_netio(msg->netio);
 
-	BUG_ON(msg->sio == NULL);
+	BUG_ON(msg->netio == NULL);
 
 	if (sio->write_queue.next == &msg->list) {
 		if (sio->write_offset)
@@ -465,21 +457,16 @@ int pcs_sock_cancel_msg(struct pcs_msg * msg)
 	return 0;
 }
 
-int pcs_sock_queuelen(struct pcs_sockio * sio)
+static void pcs_restore_sockets(struct pcs_sockio *sio)
 {
-	return sio->write_queue_len;
-}
-
-void pcs_restore_sockets(struct pcs_ioconn *ioconn)
-{
-	struct sock *sk = ioconn->socket->sk;
+	struct sock *sk = sio->socket->sk;
 
 	write_lock_bh(&sk->sk_callback_lock);
 	if (sk->sk_user_data) {
-		rcu_assign_sk_user_data(sk, ioconn->orig.user_data);
-		sk->sk_data_ready = ioconn->orig.data_ready;
-		sk->sk_write_space = ioconn->orig.write_space;
-		sk->sk_error_report = ioconn->orig.error_report;
+		rcu_assign_sk_user_data(sk, sio->orig.user_data);
+		sk->sk_data_ready = sio->orig.data_ready;
+		sk->sk_write_space = sio->orig.write_space;
+		sk->sk_error_report = sio->orig.error_report;
 		//sock->sk->sk_state_change = pcs_state_chage;
 	}
 	write_unlock_bh(&sk->sk_callback_lock);
@@ -491,7 +478,7 @@ void pcs_restore_sockets(struct pcs_ioconn *ioconn)
 static void sio_destroy_rcu(struct rcu_head *head)
 {
 	struct pcs_sockio *sio = container_of(head, struct pcs_sockio, rcu);
-	struct pcs_rpc *ep = sio->parent;
+	struct pcs_rpc *ep = sio->netio.parent;
 
 	pcs_rpc_put(ep);
 	memset(sio, 0xFF, sizeof(*sio));
@@ -508,10 +495,10 @@ void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn)
 	BUG_ON(!list_empty(&sio->write_queue));
 	BUG_ON(sio->write_queue_len);
 
-	if (ioconn->socket) {
-		pcs_restore_sockets(ioconn);
-		sock_release(ioconn->socket);
-		ioconn->socket = NULL;
+	if (sio->socket) {
+		pcs_restore_sockets(sio);
+		sock_release(sio->socket);
+		sio->socket = NULL;
 	}
 
 	/* Wait pending socket callbacks, e.g., sk_data_ready() */
@@ -527,7 +514,7 @@ static void pcs_sk_kick_queue(struct sock *sk)
 	rcu_read_lock();
 	sio = rcu_dereference_sk_user_data(sk);
 	if (sio) {
-		struct pcs_rpc *ep = sio->parent;
+		struct pcs_rpc *ep = sio->netio.parent;
 		TRACE(PEER_FMT" queue\n", PEER_ARGS(ep));
 		pcs_rpc_kick_queue(ep);
 	}
@@ -553,34 +540,27 @@ void pcs_sk_error_report(struct sock *sk)
 	rcu_read_lock();
 	sio = rcu_dereference_sk_user_data(sk);
 	if (sio) {
-		struct pcs_rpc *ep = sio->parent;
+		struct pcs_rpc *ep = sio->netio.parent;
 
-		if (test_bit(PCS_IOCONN_BF_DEAD, &sio->ioconn.flags) ||
-		    test_bit(PCS_IOCONN_BF_ERROR, &sio->ioconn.flags))
+		if (test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags) ||
+		    test_bit(PCS_IOCONN_BF_ERROR, &sio->io_flags))
 			goto unlock;
 
-		set_bit(PCS_IOCONN_BF_ERROR, &sio->ioconn.flags);
+		set_bit(PCS_IOCONN_BF_ERROR, &sio->io_flags);
 		pcs_rpc_kick_queue(ep);
 	}
 unlock:
 	rcu_read_unlock();
 }
 
-void pcs_sockio_start(struct pcs_sockio * sio)
-{
-	//// TODO: dmonakhov
-	////pcs_ioconn_register(&sio->ioconn);
-}
-
 static void pcs_deaccount_msg(struct pcs_msg * msg)
 {
-	msg->sio = NULL;
+	msg->netio = NULL;
 }
 
 static void pcs_account_msg(struct pcs_sockio * sio, struct pcs_msg * msg)
 {
-	msg->sio = sio;
-
+	msg->netio = &sio->netio;
 }
 
 static void pcs_msg_input_destructor(struct pcs_msg * msg)
@@ -632,7 +612,7 @@ struct pcs_msg * pcs_alloc_output_msg(int datalen)
 	if (msg) {
 		pcs_msg_io_init(msg);
 		msg->rpc = NULL;
-		msg->sio = NULL;
+		msg->netio = NULL;
 		msg->destructor = pcs_io_msg_output_destructor;
 		msg->get_iter = pcs_get_iter_inline;
 	}
@@ -729,24 +709,59 @@ struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int copy_len)
 	return clone;
 }
 
-void pcs_sock_throttle(struct pcs_sockio * sio)
+static void pcs_sock_throttle(struct pcs_netio *netio)
 {
+	struct pcs_sockio *sio = sio_from_netio(netio);
+
 	if ((sio->flags & PCS_SOCK_F_THROTTLE) ||
-	    test_bit(PCS_IOCONN_BF_DEAD, &sio->ioconn.flags))
+	    test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags))
 		return;
 
-	DTRACE("Throttle on socket %p rpc=%p", sio, sio->parent);
+	DTRACE("Throttle on socket %p rpc=%p", sio, sio->netio.parent);
 	sio->flags |= PCS_SOCK_F_THROTTLE;
 }
 
-void pcs_sock_unthrottle(struct pcs_sockio * sio)
+static void pcs_sock_unthrottle(struct pcs_netio *netio)
 {
+	struct pcs_sockio *sio = sio_from_netio(netio);
+
 	if (!(sio->flags & PCS_SOCK_F_THROTTLE) ||
-	    test_bit(PCS_IOCONN_BF_DEAD, &sio->ioconn.flags))
+	    test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags))
 		return;
 
-	DTRACE("Unthrottle on socket %p rpc=%p", sio, sio->parent);
+	DTRACE("Unthrottle on socket %p rpc=%p", sio, sio->netio.parent);
 	sio->flags &= ~PCS_SOCK_F_THROTTLE;
 	if ((sio->flags & PCS_SOCK_F_EOF))
 		return;
 }
+
+static void pcs_sock_abort_io(struct pcs_netio *netio, int error)
+{
+	struct pcs_sockio *sio = sio_from_netio(netio);
+
+	netio->eof = NULL;
+	pcs_sock_error(sio, error);
+}
+
+static unsigned long pcs_sock_next_timeout(struct pcs_netio *netio)
+{
+	struct pcs_sockio *sio = sio_from_netio(netio);
+	struct pcs_msg *msg;
+
+	if (list_empty(&sio->write_queue))
+		return 0;
+
+	msg = list_first_entry(&sio->write_queue, struct pcs_msg, list);
+	return msg->start_time + sio->send_timeout;
+}
+
+struct pcs_netio_tops pcs_sock_netio_tops = {
+	.throttle		= pcs_sock_throttle,
+	.unthrottle		= pcs_sock_unthrottle,
+	.send_msg		= pcs_sock_sendmsg,
+	.cancel_msg		= pcs_sock_cancel_msg,
+	.abort_io		= pcs_sock_abort_io,
+	.xmit			= pcs_sockio_xmit,
+	.flush			= pcs_sockio_flush,
+	.next_timeout		= pcs_sock_next_timeout,
+};
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
index 35bf12c9f53a..6487612eabce 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.h
+++ b/fs/fuse/kio/pcs/pcs_sock_io.h
@@ -5,10 +5,12 @@
 #include <linux/in.h>
 #include <linux/in6.h>
 #include <linux/un.h>
+#include <linux/fs.h>
 
 #include "pcs_types.h"
 ////#include "pcs_process.h"
 #include "pcs_error.h"
+#include "pcs_net.h"
 #include "log.h"
 
 #define PCS_MSG_MAX_CALENDAR	64
@@ -17,6 +19,8 @@
 #define PCS_SIO_PREEMPT_LIMIT	16
 #define PCS_SIO_SLICE (5 * HZ / 1000) /* 5ms */
 
+extern struct pcs_netio_tops pcs_sock_netio_tops;
+
 
 struct pcs_api_channel
 {
@@ -37,7 +41,7 @@ __pre_packed struct pcs_msg
 		struct pcs_msg	*response;	/* Consider removing. It can be done passing the second
 						 * argument to done();
 						 */
-		struct pcs_sockio *sio;
+		struct pcs_netio *netio;
 		struct pcs_rpc	*rpc;
 
 		int		size;
@@ -96,12 +100,13 @@ enum
 	PCS_IOCONN_BF_DEAD		= 0,
 	PCS_IOCONN_BF_ERROR		= 1, /* Notify from ->sk_error_report */
 };
-struct pcs_ioconn {
 
-	struct list_head	list;
-	struct socket		*socket;
+struct pcs_sockio
+{
+	struct pcs_netio	netio;
 
-	unsigned long		flags;		/* atomic bit ops */
+	struct socket		*socket;
+	unsigned long		io_flags;	/* atomic bit ops */
 	/* Save original socket->sk callbacks */
 	struct {
 		void			*user_data;
@@ -110,18 +115,10 @@ struct pcs_ioconn {
 		void			(*data_ready)(struct sock *sk);
 		void			(*write_space)(struct sock *sk);
 	} orig;
-	void(*destruct)(struct pcs_ioconn *);
-
-};
-
-struct pcs_sockio
-{
-	struct pcs_ioconn	ioconn;
 
 	struct list_head	write_queue;
 	int			write_queue_len;
 	spinlock_t		q_lock;
-	struct pcs_rpc		*parent;
 
 	pcs_error_t		error;
 	int			send_timeout;
@@ -138,32 +135,18 @@ struct pcs_sockio
 	struct iov_iter		read_iter;
 	struct iov_iter		write_iter;
 	struct mutex		mutex;
-	struct pcs_msg *	(*get_msg)(struct pcs_sockio *, u32 *);
-	/* eof() handler could be called twice: once on graceful socket shutdown and from sio_abort() */
-	void			(*eof)(struct pcs_sockio *);
 	void			(*write_wakeup)(struct pcs_sockio *);
 	struct rcu_head		rcu;
 
 	char			_inline_buffer[0];
 };
 
-#define sio_from_ioconn(conn) container_of(conn, struct pcs_sockio, ioconn)
-
-void pcs_sockio_start(struct pcs_sockio * sio);
-void pcs_sock_sendmsg(struct pcs_sockio * sio, struct pcs_msg *msg);
-int pcs_sock_cancel_msg(struct pcs_msg * msg);
-void pcs_sockio_xmit(struct pcs_sockio *sio);
-int  pcs_sockio_delayed_seg(struct pcs_sockio *sio);
-int pcs_sock_queuelen(struct pcs_sockio * sio);
-void pcs_sock_abort(struct pcs_sockio * sio);
-void pcs_sock_error(struct pcs_sockio * sio, int error);
-
 void pcs_sk_data_ready(struct sock *sk);
 void pcs_sk_write_space(struct sock *sk);
 void pcs_sk_error_report(struct sock *sk);
 
-void pcs_sock_throttle(struct pcs_sockio * sio);
-void pcs_sock_unthrottle(struct pcs_sockio * sio);
+#define sio_from_netio(nio) container_of(nio, struct pcs_sockio, netio)
+#define sio_from_ioconn(conn) container_of(conn, struct pcs_sockio, netio.ioconn)
 
 struct pcs_msg * pcs_alloc_input_msg(struct pcs_sockio * sio, int datalen);
 struct pcs_msg * pcs_alloc_output_msg(int datalen);
@@ -190,6 +173,7 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, struct kvec *vec)
 			iov_iter_get_kvec_callback, vec);
 }
 void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn);
+void pcs_msg_sent(struct pcs_msg * msg);
 
 static inline void * msg_inline_head(struct pcs_msg * msg)
 {
@@ -244,17 +228,4 @@ static inline void pcs_msg_io_fini(struct pcs_msg * msg)
 	BUG_ON(msg->_iocount != 0);
 }
 
-
-struct bufqueue;
-
-/**
-   Present a portion of @bq as a pcs_msg that may be passed to pcs_sock_sendmsg().
-   Reading data from the pcs_msg will drain @bq.
-
-   \param @bq the buffer queue with the data of a message
-   \param @size the length of the head of @bq that will be presented as a pcs_msg
-   \returns a pcs_msg that reads data from @bq
-*/
-struct pcs_msg* bufqueue_as_pcs_output_msg(struct bufqueue *bq, u32 size);
-
 #endif /* _PCS_SOCK_IO_H_ */


More information about the Devel mailing list