[Devel] [PATCH RHEL8 COMMIT] fs/fuse kio: make pcs rpc socket independent
Konstantin Khorenko
khorenko at virtuozzo.com
Thu Oct 15 10:37:34 MSK 2020
The commit is pushed to "branch-rh8-4.18.0-193.6.3.vz8.4.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh8-4.18.0-193.6.3.vz8.4.13
------>
commit 2bf7282221e5a19fcf3b71799d7dd19a207f8f8d
Author: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
Date: Thu Oct 15 10:37:34 2020 +0300
fs/fuse kio: make pcs rpc socket independent
This is important step towards implementation of RDMA transport.
Direct references to socket transport are replaced with an
abstract network transport.
https://pmc.acronis.com/browse/VSTOR-4310
Signed-off-by: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
Signed-off-by: Ildar Ismagilov <Ildar.Ismagilov at acronis.com>
---
fs/fuse/kio/pcs/pcs_cs.c | 6 +-
fs/fuse/kio/pcs/pcs_ioconn.h | 11 +++
fs/fuse/kio/pcs/pcs_net.h | 56 ++++++++++++
fs/fuse/kio/pcs/pcs_rpc.c | 70 ++++++++-------
fs/fuse/kio/pcs/pcs_rpc.h | 6 +-
fs/fuse/kio/pcs/pcs_sock_conn.c | 30 ++++---
fs/fuse/kio/pcs/pcs_sock_io.c | 191 ++++++++++++++++++++++------------------
fs/fuse/kio/pcs/pcs_sock_io.h | 55 +++---------
8 files changed, 243 insertions(+), 182 deletions(-)
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 7ecd54fcf494..57b07df00f55 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -437,6 +437,8 @@ static void cs_get_read_response_iter(struct pcs_msg *msg, int offset, struct io
static void cs_connect(struct pcs_rpc *ep)
{
+ void (*connect_start)(struct pcs_rpc *);
+
if (ep->flags & PCS_RPC_F_LOCAL) {
char path[128];
@@ -453,6 +455,7 @@ static void cs_connect(struct pcs_rpc *ep)
ep->sh.sun.sun_family = AF_UNIX;
ep->sh.sa_len = sizeof(struct sockaddr_un);
strcpy(ep->sh.sun.sun_path, path);
+ connect_start = pcs_sockconnect_start;
} else {
/* TODO: print sock addr using pcs_format_netaddr() */
if (ep->addr.type != PCS_ADDRTYPE_RDMA) {
@@ -460,6 +463,7 @@ static void cs_connect(struct pcs_rpc *ep)
TRACE("netaddr to sockaddr failed");
goto fail;
}
+ connect_start = pcs_sockconnect_start;
} else {
WARN_ON_ONCE(1);
/* TODO: rdma connect init */
@@ -467,7 +471,7 @@ static void cs_connect(struct pcs_rpc *ep)
}
}
ep->state = PCS_RPC_CONNECT;
- pcs_sockconnect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
+ connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
return;
fail:
pcs_rpc_reset(ep);
diff --git a/fs/fuse/kio/pcs/pcs_ioconn.h b/fs/fuse/kio/pcs/pcs_ioconn.h
new file mode 100644
index 000000000000..93806e273a73
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_ioconn.h
@@ -0,0 +1,11 @@
+#ifndef _PCS_IOCONN_H_
+#define _PCS_IOCONN_H_ 1
+
+#include "pcs_types.h"
+
+struct pcs_ioconn
+{
+ void(*destruct)(struct pcs_ioconn *); /* called in pcs_ioconn_unregister() */
+};
+
+#endif /* _PCS_IOCONN_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_net.h b/fs/fuse/kio/pcs/pcs_net.h
new file mode 100644
index 000000000000..64a20be482f3
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_net.h
@@ -0,0 +1,56 @@
+#ifndef _PCS_NET_H_
+#define _PCS_NET_H_ 1
+
+#include "pcs_types.h"
+#include "pcs_ioconn.h"
+
+struct pcs_msg;
+struct pcs_netio;
+struct pcs_rpc;
+
+struct pcs_netio_tops {
+ /* suspend polling events on netio->ioconn.fd */
+ void (*throttle)(struct pcs_netio *netio);
+
+ /* resume polling events on netio->ioconn.fd */
+ void (*unthrottle)(struct pcs_netio *netio);
+
+ /* queue message for sending */
+ void (*send_msg)(struct pcs_netio *netio, struct pcs_msg *msg);
+
+ /* try to cancel message send */
+ int (*cancel_msg)(struct pcs_msg *msg);
+
+ /* tear down connection, finilize all in-flight messages with error */
+ void (*abort_io)(struct pcs_netio *netio, int error);
+
+ /* try to transmit messages */
+ void (*xmit)(struct pcs_netio *netio);
+
+ /* try to flush messages */
+ int (*flush)(struct pcs_netio *netio);
+
+ /* get next timeout */
+ unsigned long (*next_timeout)(struct pcs_netio *netio);
+};
+
+struct pcs_netio {
+ struct pcs_ioconn ioconn;
+ struct pcs_rpc *parent;
+
+ /* transport methods */
+ struct pcs_netio_tops *tops;
+
+ /* callbacks */
+
+ /* create pcs_msg by inline_buffer pointing to the head of new incoming message */
+ struct pcs_msg *(*getmsg)(struct pcs_netio *netio, char *inline_buffer,
+ u32 *msg_size);
+
+ /* report "connection closed" event: graceful shutdown or abort_io. Notice, that
+ * the handler could be called twice: once on graceful shutdown and from abort_io()
+ */
+ void (*eof)(struct pcs_netio *netio);
+};
+
+#endif /* _PCS_NET_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index d2ad9537418a..114e6bf59e7c 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -23,7 +23,6 @@
#include "pcs_types.h"
-#include "pcs_sock_io.h"
#include "pcs_rpc.h"
#include "pcs_cluster.h"
#include "log.h"
@@ -202,14 +201,17 @@ void rpc_abort(struct pcs_rpc * ep, int fatal, int error)
if (ep->conn) {
struct pcs_ioconn * ioconn = ep->conn;
- struct pcs_sockio * sio = sio_from_ioconn(ioconn);
ep->conn = NULL;
if (ep->gc)
list_lru_del(&ep->gc->lru, &ep->lru_link);
- sio->eof = NULL;
- pcs_sock_error(sio, error);
+ /* TODO: Add support of PCS_RPC_CONNECT state */
+ if (state != PCS_RPC_CONNECT) {
+ struct pcs_netio *netio = (struct pcs_netio *)ioconn;
+ netio->tops->abort_io(netio, error);
+ }
+
if (ioconn->destruct)
ioconn->destruct(ioconn);
}
@@ -357,9 +359,9 @@ void __pcs_rpc_put(struct pcs_rpc *ep)
queue_work(pcs_cleanup_wq, &rpc_cleanup_work);
}
-void rpc_eof_cb(struct pcs_sockio * sio)
+void rpc_eof_cb(struct pcs_netio * netio)
{
- struct pcs_rpc * ep = sio->parent;
+ struct pcs_rpc * ep = netio->parent;
if (WARN_ON_ONCE(ep == NULL))
return;
@@ -367,7 +369,7 @@ void rpc_eof_cb(struct pcs_sockio * sio)
/* Dead socket is finally closed, we could already open another one.
* I feel inconvenient about this.
*/
- if (&sio->ioconn != ep->conn)
+ if (&netio->ioconn != ep->conn)
return;
rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
@@ -403,8 +405,8 @@ void pcs_rpc_error_respond(struct pcs_rpc * ep, struct pcs_msg * msg, int err)
eresp = pcs_rpc_alloc_error_response(ep, h, err, sizeof(struct pcs_rpc_error_resp));
if (eresp) {
- struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
- pcs_sock_sendmsg(sio, eresp);
+ struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+ netio->tops->send_msg(netio, eresp);
}
}
@@ -555,10 +557,10 @@ void rpc_work_input(struct pcs_msg * msg)
pcs_free_msg(msg);
}
-struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size)
+struct pcs_msg *rpc_get_hdr(struct pcs_netio * netio, char *inline_buffer, u32 *msg_size)
{
- struct pcs_rpc * ep = sio->parent;
- struct pcs_rpc_hdr * h = (struct pcs_rpc_hdr*)sio_inline_buffer(sio);
+ struct pcs_rpc * ep = netio->parent;
+ struct pcs_rpc_hdr * h = (struct pcs_rpc_hdr*)inline_buffer;
struct pcs_msg * msg;
void (*next_input)(struct pcs_msg *);
@@ -594,7 +596,7 @@ struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size)
msg = pcs_rpc_alloc_input_msg(ep, h->len);
if (!msg) {
- pcs_sock_throttle(sio);
+ netio->tops->throttle(netio);
return NULL;
}
@@ -628,8 +630,6 @@ void pcs_rpc_connect(struct pcs_rpc * ep)
*/
static void pcs_rpc_send(struct pcs_rpc * ep, struct pcs_msg * msg, bool requeue)
{
- struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
-
BUG_ON(!mutex_is_locked(&ep->mutex));
BUG_ON(msg->rpc != (requeue ? ep: NULL));
@@ -655,9 +655,10 @@ static void pcs_rpc_send(struct pcs_rpc * ep, struct pcs_msg * msg, bool requeue
if (ep->state == PCS_RPC_WORK) {
BUG_ON(ep->conn == NULL);
- if (msg->size)
- pcs_sock_sendmsg(sio, msg);
- else {
+ if (msg->size) {
+ struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+ netio->tops->send_msg(netio, msg);
+ } else {
pcs_msg_del_calendar(msg);
msg->done(msg);
}
@@ -760,7 +761,7 @@ static void calendar_work(struct work_struct *w)
pcs_msg_del_calendar(msg);
switch (msg->stage) {
case PCS_MSG_STAGE_SEND:
- if (pcs_sock_cancel_msg(msg)) {
+ if (msg->netio->tops->cancel_msg(msg)) {
/* The message is under network IO right now. We cannot kill it
* without destruction of the whole connection. So, we just reschedule
* kill. When IO will complete, it will be killed not even waiting
@@ -809,7 +810,7 @@ static void calendar_work(struct work_struct *w)
static void update_xmit_timeout(struct pcs_rpc *ep)
{
- struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
+ struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
struct pcs_cluster_core *cc = cc_from_rpc(ep->eng);
struct pcs_msg * msg;
unsigned long timeout = 0;
@@ -817,7 +818,7 @@ static void update_xmit_timeout(struct pcs_rpc *ep)
BUG_ON(ep->state != PCS_RPC_WORK);
- if (list_empty(&ep->pending_queue) && list_empty(&sio->write_queue)) {
+ if (list_empty(&ep->pending_queue) && !netio->tops->next_timeout(netio)) {
if (timer_pending(&ep->timer_work.timer))
cancel_delayed_work(&ep->timer_work);
return;
@@ -827,9 +828,8 @@ static void update_xmit_timeout(struct pcs_rpc *ep)
timeout = msg->start_time + ep->params.response_timeout;
}
- if (!list_empty(&sio->write_queue)) {
- msg = list_first_entry(&sio->write_queue, struct pcs_msg, list);
- tx = msg->start_time + sio->send_timeout;
+ if (netio->tops->next_timeout(netio)) {
+ tx = netio->tops->next_timeout(netio);
if (time_after(tx, timeout))
timeout = tx;
}
@@ -839,7 +839,6 @@ static void update_xmit_timeout(struct pcs_rpc *ep)
timeout -= jiffies;
mod_delayed_work(cc->wq, &ep->timer_work, timeout);
-
}
static void rpc_queue_work(struct work_struct *w)
@@ -861,9 +860,8 @@ static void rpc_queue_work(struct work_struct *w)
/* Process messages which are already in the sock queue */
if (ep->state == PCS_RPC_WORK) {
- struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
-
- pcs_sockio_xmit(sio);
+ struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+ netio->tops->xmit(netio);
}
/* Process delayed ones */
@@ -884,14 +882,16 @@ static void rpc_queue_work(struct work_struct *w)
list_del_init(&msg->list);
pcs_rpc_send(ep, msg, 1);
}
+
repeat = 0;
if (ep->state == PCS_RPC_WORK) {
- struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
-
- if (pcs_sockio_delayed_seg(sio))
+ struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+ if (netio->tops->flush(netio))
repeat = 1;
- update_xmit_timeout(ep);
}
+ if (ep->state == PCS_RPC_WORK)
+ update_xmit_timeout(ep);
+
mutex_unlock(&ep->mutex);
if (repeat)
goto again;
@@ -1100,8 +1100,10 @@ void pcs_rpc_deaccount_msg(struct pcs_msg * msg)
if (ep->accounted == 0)
ep->eng->accounted_rpcs--;
msg->accounted = 0;
- if (ep->state == PCS_RPC_WORK)
- pcs_sock_unthrottle((struct pcs_sockio *)ep->conn);
+ if (ep->state == PCS_RPC_WORK) {
+ struct pcs_netio *netio = (struct pcs_netio *)ep->conn;
+ netio->tops->unthrottle(netio);
+ }
}
pcs_rpc_put(ep);
}
diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
index ffa08cce7a18..1f3371a891dc 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.h
+++ b/fs/fuse/kio/pcs/pcs_rpc.h
@@ -3,6 +3,7 @@
//#include "pcs_defs.h"
#include "pcs_rpc_prot.h"
+#include "pcs_net.h"
#include "pcs_sock_io.h"
struct pcs_msg;
@@ -285,7 +286,7 @@ void pcs_rpc_init_response(struct pcs_msg * msg, struct pcs_rpc_hdr * req_hdr, i
/* Allocate message and initialize header */
struct pcs_msg * pcs_rpc_alloc_msg_w_hdr(int type, int size);
-struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size);
+struct pcs_msg *rpc_get_hdr(struct pcs_netio * netio, char *inline_buffer, u32 *msg_size);
void pcs_rpc_set_memlimits(struct pcs_rpc_engine * eng, u64 thresh, u64 limit);
void pcs_rpc_account_adjust(struct pcs_msg * msg, int adjustment);
@@ -298,9 +299,8 @@ int pcs_cluster_id_eq(PCS_CLUSTER_ID_T *id1, PCS_CLUSTER_ID_T *id2);
void rpc_trace_health(struct pcs_rpc * ep);
void pcs_rpc_enumerate_rpc(struct pcs_rpc_engine *eng, void (*cb)(struct pcs_rpc *ep, void *arg), void *arg);
-void pcs_rpc_set_sock(struct pcs_rpc *ep, struct pcs_sockio * sio);
void pcs_rpc_enable(struct pcs_rpc * ep, int error);
-void rpc_eof_cb(struct pcs_sockio *sio);
+void rpc_eof_cb(struct pcs_netio *netio);
static inline struct pcs_rpc *pcs_rpc_from_work(struct work_struct *wr)
{
diff --git a/fs/fuse/kio/pcs/pcs_sock_conn.c b/fs/fuse/kio/pcs/pcs_sock_conn.c
index 039fcf80dea6..83043177fb92 100644
--- a/fs/fuse/kio/pcs/pcs_sock_conn.c
+++ b/fs/fuse/kio/pcs/pcs_sock_conn.c
@@ -327,6 +327,7 @@ struct pcs_rpc_auth
static int send_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
{
struct pcs_rpc_engine *eng = ep->eng;
+ struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
struct pcs_rpc_auth *au;
size_t msg_sz = sizeof(struct pcs_rpc_auth) +
round_up(size, PCS_RPC_AUTH_PAYLOAD_ALIGN) +
@@ -377,7 +378,7 @@ static int send_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
TRACE("state=%d, type=%d, len=%d, msg_sz: %lu",
au->state, au->payload.type, au->payload.len, msg_sz);
- err = send_buf(ep->conn->socket, (u8*)au, msg_sz);
+ err = send_buf(sio->socket, (u8*)au, msg_sz);
if (err)
TRACE("Can't send au msg, err: %d", err);
pcs_free_msg(msg);
@@ -387,6 +388,7 @@ static int send_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
static int recv_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
{
+ struct pcs_sockio *sio = sio_from_ioconn(ep->conn);
struct pcs_rpc_auth *au;
size_t fixed_sz = sizeof(struct pcs_rpc_auth) +
round_up(size, PCS_RPC_AUTH_PAYLOAD_ALIGN);
@@ -399,7 +401,7 @@ static int recv_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
return -ENOMEM;
}
- err = recv_buf(ep->conn->socket, msg->_inline_buffer, fixed_sz);
+ err = recv_buf(sio->socket, msg->_inline_buffer, fixed_sz);
if (err) {
TRACE("Can't recv auth msg(%d), err: %lu", err, fixed_sz);
goto fail;
@@ -440,7 +442,7 @@ static int recv_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
size_t rest_sz = au->hdr.len - fixed_sz;
while (rest_sz) {
size_t recv_sz = min(fixed_sz, rest_sz);
- err = recv_buf(ep->conn->socket, msg->_inline_buffer,
+ err = recv_buf(sio->socket, msg->_inline_buffer,
recv_sz);
if (err) {
TRACE("Can't recv auth msg(%d), err: %lu",
@@ -591,7 +593,6 @@ void pcs_sockconnect_start(struct pcs_rpc *ep)
iov_iter_kvec(&sio->write_iter, WRITE, NULL, 0, 0);
sio->hdr_max = sizeof(struct pcs_rpc_hdr);
sio->flags = sa->sa_family != AF_UNIX ? PCS_SOCK_F_CORK : 0;
- INIT_LIST_HEAD(&sio->ioconn.list);
err = sock_create(sa->sa_family, SOCK_STREAM, 0, &sock);
if (err < 0) {
@@ -616,13 +617,14 @@ void pcs_sockconnect_start(struct pcs_rpc *ep)
cancel_delayed_work(&ep->timer_work);
ep->retries++;
- ep->conn = &sio->ioconn;
- sio->parent = pcs_rpc_get(ep);
- sio->get_msg = rpc_get_hdr;
- sio->eof = rpc_eof_cb;
+ ep->conn = &sio->netio.ioconn;
sio->send_timeout = PCS_SIO_TIMEOUT;
- sio->ioconn.socket = sock;
- sio->ioconn.destruct = pcs_sock_ioconn_destruct;
+ sio->socket = sock;
+ sio->netio.ioconn.destruct = pcs_sock_ioconn_destruct;
+ sio->netio.parent = pcs_rpc_get(ep);
+ sio->netio.tops = &pcs_sock_netio_tops;
+ sio->netio.getmsg = rpc_get_hdr;
+ sio->netio.eof = rpc_eof_cb;
if (ep->gc)
list_lru_add(&ep->gc->lru, &ep->lru_link);
@@ -646,10 +648,10 @@ void pcs_sockconnect_start(struct pcs_rpc *ep)
* since this seems to be able to result in performance.
*/
WARN_ON_ONCE(sock->sk->sk_user_data);
- sio->ioconn.orig.user_data = sock->sk->sk_user_data;
- sio->ioconn.orig.data_ready = sock->sk->sk_data_ready;
- sio->ioconn.orig.write_space = sock->sk->sk_write_space;
- sio->ioconn.orig.error_report = sock->sk->sk_error_report;
+ sio->orig.user_data = sock->sk->sk_user_data;
+ sio->orig.data_ready = sock->sk->sk_data_ready;
+ sio->orig.write_space = sock->sk->sk_write_space;
+ sio->orig.error_report = sock->sk->sk_error_report;
sock->sk->sk_sndtimeo = PCS_SIO_TIMEOUT;
sock->sk->sk_allocation = GFP_NOFS;
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
index 4191943540a3..33feb33ecec0 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.c
+++ b/fs/fuse/kio/pcs/pcs_sock_io.c
@@ -12,7 +12,7 @@
#include "log.h"
-static void sio_msg_sent(struct pcs_msg * msg)
+void pcs_msg_sent(struct pcs_msg * msg)
{
msg->stage = PCS_MSG_STAGE_SENT;
if (msg->timeout) {
@@ -22,13 +22,13 @@ static void sio_msg_sent(struct pcs_msg * msg)
}
}
-void sio_push(struct pcs_sockio * sio)
+static void sio_push(struct pcs_sockio * sio)
{
- TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->parent));
+ TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->netio.parent));
if (sio->flags & PCS_SOCK_F_CORK) {
int optval = 1;
int ret;
- ret = kernel_setsockopt(sio->ioconn.socket, SOL_TCP, TCP_NODELAY,
+ ret = kernel_setsockopt(sio->socket, SOL_TCP, TCP_NODELAY,
(char *)&optval, sizeof(optval));
if (ret)
TRACE("kernel_setsockopt(TCP_NODELAY) failed: %d", ret);
@@ -36,18 +36,18 @@ void sio_push(struct pcs_sockio * sio)
}
}
-static void pcs_ioconn_unregister(struct pcs_ioconn *ioconn)
+static void pcs_ioconn_unregister(struct pcs_sockio *sio)
{
- if (!test_bit(PCS_IOCONN_BF_DEAD, &ioconn->flags))
- set_bit(PCS_IOCONN_BF_DEAD, &ioconn->flags);
+ if (!test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags))
+ set_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags);
}
-static void pcs_ioconn_close(struct pcs_ioconn *ioconn)
+static void pcs_ioconn_close(struct pcs_sockio *sio)
{
- kernel_sock_shutdown(ioconn->socket, SHUT_RDWR);
+ kernel_sock_shutdown(sio->socket, SHUT_RDWR);
}
-void sio_abort(struct pcs_sockio * sio, int error)
+static void sio_abort(struct pcs_sockio * sio, int error)
{
if (sio->current_msg) {
pcs_free_msg(sio->current_msg);
@@ -59,32 +59,23 @@ void sio_abort(struct pcs_sockio * sio, int error)
struct pcs_msg * msg = list_first_entry(&sio->write_queue, struct pcs_msg, list);
list_del(&msg->list);
sio->write_queue_len -= msg->size;
- sio_msg_sent(msg);
+ pcs_msg_sent(msg);
pcs_set_local_error(&msg->error, error);
BUG_ON(!hlist_unhashed(&msg->kill_link));
msg->done(msg);
}
- pcs_ioconn_unregister(&sio->ioconn);
- pcs_ioconn_close(&sio->ioconn);
+ pcs_ioconn_unregister(sio);
+ pcs_ioconn_close(sio);
pcs_set_local_error(&sio->error, error);
- if (sio->eof) {
- void (*eof)(struct pcs_sockio *) = sio->eof;
- sio->eof = NULL;
- (*eof)(sio);
+ if (sio->netio.eof) {
+ void (*eof)(struct pcs_netio *) = sio->netio.eof;
+ sio->netio.eof = NULL;
+ (*eof)(&sio->netio);
}
}
-
-void pcs_sock_abort(struct pcs_sockio * sio)
-{
- if (!sio)
- return;
-
- sio_abort(sio, PCS_ERR_NET_ABORT);
-}
-
-void pcs_sock_error(struct pcs_sockio * sio, int error)
+static void pcs_sock_error(struct pcs_sockio * sio, int error)
{
sio_abort(sio, error);
}
@@ -195,7 +186,7 @@ static int do_sock_recv(struct socket *sock, void *buf, size_t len)
rcu_read_lock();
sio = rcu_dereference_sk_user_data(sock->sk);
if (sio) {
- TRACE("RET: "PEER_FMT" len:%ld ret:%d\n", PEER_ARGS(sio->parent),
+ TRACE("RET: "PEER_FMT" len:%ld ret:%d\n", PEER_ARGS(sio->netio.parent),
len, ret);
}
rcu_read_unlock();
@@ -212,20 +203,19 @@ static int do_sock_recv_callback(struct kvec *vec, void *context)
static void pcs_sockio_recv(struct pcs_sockio *sio)
{
- struct pcs_ioconn* conn = &sio->ioconn;
struct iov_iter *it = &sio->read_iter;
- struct pcs_rpc *ep = sio->parent;
+ struct pcs_rpc *ep = sio->netio.parent;
int count = 0;
u32 msg_size;
unsigned long loop_timeout = jiffies + PCS_SIO_SLICE;
TRACE("ENTER:" PEER_FMT " sio:%p cur_msg:%p\n", PEER_ARGS(ep), sio, sio->current_msg);
- while(!test_bit(PCS_IOCONN_BF_DEAD, &conn->flags)) {
+ while(!test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags)) {
int n;
struct pcs_msg * msg;
- if (test_bit(PCS_IOCONN_BF_ERROR, &conn->flags)) {
+ if (test_bit(PCS_IOCONN_BF_ERROR, &sio->io_flags)) {
sio_abort(sio, PCS_ERR_NET_ABORT);
return;
}
@@ -238,14 +228,14 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
n = 0;
if (copy)
- n = do_sock_recv(conn->socket, (char *)sio_inline_buffer(sio) + sio->hdr_ptr, copy);
+ n = do_sock_recv(sio->socket, (char *)sio_inline_buffer(sio) + sio->hdr_ptr, copy);
if (n > 0 || n == copy /* recv return 0 when copy is 0 */) {
sio->hdr_ptr += n;
if(sio->hdr_ptr != sio->hdr_max)
return;
- msg = sio->get_msg(sio, &msg_size);
+ msg = sio->netio.getmsg(&sio->netio, sio_inline_buffer(sio), &msg_size);
if (msg == NULL) {
if (sio->hdr_ptr < sio->hdr_max)
continue;
@@ -286,7 +276,7 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
n = iov_iter_for_each_range(it, min(iov_iter_single_seg_count(it),
(size_t)(msg_size - sio->read_offset)), do_sock_recv_callback,
- conn->socket);
+ sio->socket);
if (n > 0) {
sio->read_offset += n;
iov_iter_advance(it, n);
@@ -314,8 +304,7 @@ static void pcs_sockio_recv(struct pcs_sockio *sio)
static void pcs_sockio_send(struct pcs_sockio *sio)
{
- struct pcs_rpc *ep __maybe_unused = sio->parent;
- struct pcs_ioconn* conn = &sio->ioconn;
+ struct pcs_rpc *ep __maybe_unused = sio->netio.parent;
struct iov_iter *it = &sio->write_iter;
unsigned long loop_timeout = jiffies + PCS_SIO_SLICE;
struct pcs_msg * msg;
@@ -329,14 +318,14 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
/* This is original check, but it is not clear how connection can becomes
dead before sio_abort() was called. Let's simplify it with BUG_ON
- if (conn->dead) {
+ if (sio->dead) {
pcs_set_local_error(&msg->error, PCS_ERR_NET_ABORT);
goto done;
}
*/
- BUG_ON(test_bit(PCS_IOCONN_BF_DEAD, &conn->flags));
+ BUG_ON(test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags));
- if (test_bit(PCS_IOCONN_BF_ERROR, &conn->flags)) {
+ if (test_bit(PCS_IOCONN_BF_ERROR, &sio->io_flags)) {
sio_abort(sio, PCS_ERR_NET_ABORT);
return;
}
@@ -354,8 +343,8 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
msg->get_iter(msg, sio->write_offset, it, WRITE);
}
BUG_ON(iov_iter_count(it) > left);
- n = do_send_one_seg(conn->socket, it, iov_iter_single_seg_count(it),
- iov_iter_single_seg_count(it) < left);
+ n = do_send_one_seg(sio->socket, it, iov_iter_single_seg_count(it),
+ iov_iter_single_seg_count(it) < left);
if (n > 0) {
sio->write_offset += n;
iov_iter_advance(it, n);
@@ -383,7 +372,7 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
}
sio->write_offset = 0;
iov_iter_kvec(it, WRITE, NULL, 0, 0);
- sio_msg_sent(msg);
+ pcs_msg_sent(msg);
msg->done(msg);
if (++count >= PCS_SIO_PREEMPT_LIMIT ||
time_is_before_jiffies(loop_timeout)) {
@@ -395,9 +384,10 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
sio_push(sio);
}
-void pcs_sockio_xmit(struct pcs_sockio *sio)
+static void pcs_sockio_xmit(struct pcs_netio *netio)
{
- struct pcs_rpc *ep = sio->parent;
+ struct pcs_sockio *sio = sio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
BUG_ON(!mutex_is_locked(&ep->mutex));
@@ -406,13 +396,15 @@ void pcs_sockio_xmit(struct pcs_sockio *sio)
pcs_sockio_send(sio);
}
-int pcs_sockio_delayed_seg(struct pcs_sockio *sio)
+static int pcs_sockio_flush(struct pcs_netio *netio)
{
+ struct pcs_sockio *sio = sio_from_netio(netio);
return sio->flags & (PCS_SOCK_F_POOLOUT|PCS_SOCK_F_POOLIN);
}
-void pcs_sock_sendmsg(struct pcs_sockio * sio, struct pcs_msg *msg)
+static void pcs_sock_sendmsg(struct pcs_netio *netio, struct pcs_msg *msg)
{
+ struct pcs_sockio *sio = sio_from_netio(netio);
int was_idle = list_empty(&sio->write_queue);
DTRACE("sio(%p) msg:%p\n", sio, msg);
@@ -422,7 +414,7 @@ void pcs_sock_sendmsg(struct pcs_sockio * sio, struct pcs_msg *msg)
msg->done(msg);
return;
}
- msg->sio = sio;
+ msg->netio = &sio->netio;
list_add_tail(&msg->list, &sio->write_queue);
sio->write_queue_len += msg->size;
@@ -441,11 +433,11 @@ void pcs_sock_sendmsg(struct pcs_sockio * sio, struct pcs_msg *msg)
/* Try to cancel message send. If it is impossible, because message is in the middle
* of write, so nothing and return an error.
*/
-int pcs_sock_cancel_msg(struct pcs_msg * msg)
+static int pcs_sock_cancel_msg(struct pcs_msg * msg)
{
- struct pcs_sockio * sio = msg->sio;
+ struct pcs_sockio * sio = sio_from_netio(msg->netio);
- BUG_ON(msg->sio == NULL);
+ BUG_ON(msg->netio == NULL);
if (sio->write_queue.next == &msg->list) {
if (sio->write_offset)
@@ -465,21 +457,16 @@ int pcs_sock_cancel_msg(struct pcs_msg * msg)
return 0;
}
-int pcs_sock_queuelen(struct pcs_sockio * sio)
+static void pcs_restore_sockets(struct pcs_sockio *sio)
{
- return sio->write_queue_len;
-}
-
-void pcs_restore_sockets(struct pcs_ioconn *ioconn)
-{
- struct sock *sk = ioconn->socket->sk;
+ struct sock *sk = sio->socket->sk;
write_lock_bh(&sk->sk_callback_lock);
if (sk->sk_user_data) {
- rcu_assign_sk_user_data(sk, ioconn->orig.user_data);
- sk->sk_data_ready = ioconn->orig.data_ready;
- sk->sk_write_space = ioconn->orig.write_space;
- sk->sk_error_report = ioconn->orig.error_report;
+ rcu_assign_sk_user_data(sk, sio->orig.user_data);
+ sk->sk_data_ready = sio->orig.data_ready;
+ sk->sk_write_space = sio->orig.write_space;
+ sk->sk_error_report = sio->orig.error_report;
//sock->sk->sk_state_change = pcs_state_chage;
}
write_unlock_bh(&sk->sk_callback_lock);
@@ -491,7 +478,7 @@ void pcs_restore_sockets(struct pcs_ioconn *ioconn)
static void sio_destroy_rcu(struct rcu_head *head)
{
struct pcs_sockio *sio = container_of(head, struct pcs_sockio, rcu);
- struct pcs_rpc *ep = sio->parent;
+ struct pcs_rpc *ep = sio->netio.parent;
pcs_rpc_put(ep);
memset(sio, 0xFF, sizeof(*sio));
@@ -508,10 +495,10 @@ void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn)
BUG_ON(!list_empty(&sio->write_queue));
BUG_ON(sio->write_queue_len);
- if (ioconn->socket) {
- pcs_restore_sockets(ioconn);
- sock_release(ioconn->socket);
- ioconn->socket = NULL;
+ if (sio->socket) {
+ pcs_restore_sockets(sio);
+ sock_release(sio->socket);
+ sio->socket = NULL;
}
/* Wait pending socket callbacks, e.g., sk_data_ready() */
@@ -527,7 +514,7 @@ static void pcs_sk_kick_queue(struct sock *sk)
rcu_read_lock();
sio = rcu_dereference_sk_user_data(sk);
if (sio) {
- struct pcs_rpc *ep = sio->parent;
+ struct pcs_rpc *ep = sio->netio.parent;
TRACE(PEER_FMT" queue\n", PEER_ARGS(ep));
pcs_rpc_kick_queue(ep);
}
@@ -553,34 +540,27 @@ void pcs_sk_error_report(struct sock *sk)
rcu_read_lock();
sio = rcu_dereference_sk_user_data(sk);
if (sio) {
- struct pcs_rpc *ep = sio->parent;
+ struct pcs_rpc *ep = sio->netio.parent;
- if (test_bit(PCS_IOCONN_BF_DEAD, &sio->ioconn.flags) ||
- test_bit(PCS_IOCONN_BF_ERROR, &sio->ioconn.flags))
+ if (test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags) ||
+ test_bit(PCS_IOCONN_BF_ERROR, &sio->io_flags))
goto unlock;
- set_bit(PCS_IOCONN_BF_ERROR, &sio->ioconn.flags);
+ set_bit(PCS_IOCONN_BF_ERROR, &sio->io_flags);
pcs_rpc_kick_queue(ep);
}
unlock:
rcu_read_unlock();
}
-void pcs_sockio_start(struct pcs_sockio * sio)
-{
- //// TODO: dmonakhov
- ////pcs_ioconn_register(&sio->ioconn);
-}
-
static void pcs_deaccount_msg(struct pcs_msg * msg)
{
- msg->sio = NULL;
+ msg->netio = NULL;
}
static void pcs_account_msg(struct pcs_sockio * sio, struct pcs_msg * msg)
{
- msg->sio = sio;
-
+ msg->netio = &sio->netio;
}
static void pcs_msg_input_destructor(struct pcs_msg * msg)
@@ -632,7 +612,7 @@ struct pcs_msg * pcs_alloc_output_msg(int datalen)
if (msg) {
pcs_msg_io_init(msg);
msg->rpc = NULL;
- msg->sio = NULL;
+ msg->netio = NULL;
msg->destructor = pcs_io_msg_output_destructor;
msg->get_iter = pcs_get_iter_inline;
}
@@ -729,24 +709,59 @@ struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int copy_len)
return clone;
}
-void pcs_sock_throttle(struct pcs_sockio * sio)
+static void pcs_sock_throttle(struct pcs_netio *netio)
{
+ struct pcs_sockio *sio = sio_from_netio(netio);
+
if ((sio->flags & PCS_SOCK_F_THROTTLE) ||
- test_bit(PCS_IOCONN_BF_DEAD, &sio->ioconn.flags))
+ test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags))
return;
- DTRACE("Throttle on socket %p rpc=%p", sio, sio->parent);
+ DTRACE("Throttle on socket %p rpc=%p", sio, sio->netio.parent);
sio->flags |= PCS_SOCK_F_THROTTLE;
}
-void pcs_sock_unthrottle(struct pcs_sockio * sio)
+static void pcs_sock_unthrottle(struct pcs_netio *netio)
{
+ struct pcs_sockio *sio = sio_from_netio(netio);
+
if (!(sio->flags & PCS_SOCK_F_THROTTLE) ||
- test_bit(PCS_IOCONN_BF_DEAD, &sio->ioconn.flags))
+ test_bit(PCS_IOCONN_BF_DEAD, &sio->io_flags))
return;
- DTRACE("Unthrottle on socket %p rpc=%p", sio, sio->parent);
+ DTRACE("Unthrottle on socket %p rpc=%p", sio, sio->netio.parent);
sio->flags &= ~PCS_SOCK_F_THROTTLE;
if ((sio->flags & PCS_SOCK_F_EOF))
return;
}
+
+static void pcs_sock_abort_io(struct pcs_netio *netio, int error)
+{
+ struct pcs_sockio *sio = sio_from_netio(netio);
+
+ netio->eof = NULL;
+ pcs_sock_error(sio, error);
+}
+
+static unsigned long pcs_sock_next_timeout(struct pcs_netio *netio)
+{
+ struct pcs_sockio *sio = sio_from_netio(netio);
+ struct pcs_msg *msg;
+
+ if (list_empty(&sio->write_queue))
+ return 0;
+
+ msg = list_first_entry(&sio->write_queue, struct pcs_msg, list);
+ return msg->start_time + sio->send_timeout;
+}
+
+struct pcs_netio_tops pcs_sock_netio_tops = {
+ .throttle = pcs_sock_throttle,
+ .unthrottle = pcs_sock_unthrottle,
+ .send_msg = pcs_sock_sendmsg,
+ .cancel_msg = pcs_sock_cancel_msg,
+ .abort_io = pcs_sock_abort_io,
+ .xmit = pcs_sockio_xmit,
+ .flush = pcs_sockio_flush,
+ .next_timeout = pcs_sock_next_timeout,
+};
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
index 35bf12c9f53a..6487612eabce 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.h
+++ b/fs/fuse/kio/pcs/pcs_sock_io.h
@@ -5,10 +5,12 @@
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/un.h>
+#include <linux/fs.h>
#include "pcs_types.h"
////#include "pcs_process.h"
#include "pcs_error.h"
+#include "pcs_net.h"
#include "log.h"
#define PCS_MSG_MAX_CALENDAR 64
@@ -17,6 +19,8 @@
#define PCS_SIO_PREEMPT_LIMIT 16
#define PCS_SIO_SLICE (5 * HZ / 1000) /* 5ms */
+extern struct pcs_netio_tops pcs_sock_netio_tops;
+
struct pcs_api_channel
{
@@ -37,7 +41,7 @@ __pre_packed struct pcs_msg
struct pcs_msg *response; /* Consider removing. It can be done passing the second
* argument to done();
*/
- struct pcs_sockio *sio;
+ struct pcs_netio *netio;
struct pcs_rpc *rpc;
int size;
@@ -96,12 +100,13 @@ enum
PCS_IOCONN_BF_DEAD = 0,
PCS_IOCONN_BF_ERROR = 1, /* Notify from ->sk_error_report */
};
-struct pcs_ioconn {
- struct list_head list;
- struct socket *socket;
+struct pcs_sockio
+{
+ struct pcs_netio netio;
- unsigned long flags; /* atomic bit ops */
+ struct socket *socket;
+ unsigned long io_flags; /* atomic bit ops */
/* Save original socket->sk callbacks */
struct {
void *user_data;
@@ -110,18 +115,10 @@ struct pcs_ioconn {
void (*data_ready)(struct sock *sk);
void (*write_space)(struct sock *sk);
} orig;
- void(*destruct)(struct pcs_ioconn *);
-
-};
-
-struct pcs_sockio
-{
- struct pcs_ioconn ioconn;
struct list_head write_queue;
int write_queue_len;
spinlock_t q_lock;
- struct pcs_rpc *parent;
pcs_error_t error;
int send_timeout;
@@ -138,32 +135,18 @@ struct pcs_sockio
struct iov_iter read_iter;
struct iov_iter write_iter;
struct mutex mutex;
- struct pcs_msg * (*get_msg)(struct pcs_sockio *, u32 *);
- /* eof() handler could be called twice: once on graceful socket shutdown and from sio_abort() */
- void (*eof)(struct pcs_sockio *);
void (*write_wakeup)(struct pcs_sockio *);
struct rcu_head rcu;
char _inline_buffer[0];
};
-#define sio_from_ioconn(conn) container_of(conn, struct pcs_sockio, ioconn)
-
-void pcs_sockio_start(struct pcs_sockio * sio);
-void pcs_sock_sendmsg(struct pcs_sockio * sio, struct pcs_msg *msg);
-int pcs_sock_cancel_msg(struct pcs_msg * msg);
-void pcs_sockio_xmit(struct pcs_sockio *sio);
-int pcs_sockio_delayed_seg(struct pcs_sockio *sio);
-int pcs_sock_queuelen(struct pcs_sockio * sio);
-void pcs_sock_abort(struct pcs_sockio * sio);
-void pcs_sock_error(struct pcs_sockio * sio, int error);
-
void pcs_sk_data_ready(struct sock *sk);
void pcs_sk_write_space(struct sock *sk);
void pcs_sk_error_report(struct sock *sk);
-void pcs_sock_throttle(struct pcs_sockio * sio);
-void pcs_sock_unthrottle(struct pcs_sockio * sio);
+#define sio_from_netio(nio) container_of(nio, struct pcs_sockio, netio)
+#define sio_from_ioconn(conn) container_of(conn, struct pcs_sockio, netio.ioconn)
struct pcs_msg * pcs_alloc_input_msg(struct pcs_sockio * sio, int datalen);
struct pcs_msg * pcs_alloc_output_msg(int datalen);
@@ -190,6 +173,7 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, struct kvec *vec)
iov_iter_get_kvec_callback, vec);
}
void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn);
+void pcs_msg_sent(struct pcs_msg * msg);
static inline void * msg_inline_head(struct pcs_msg * msg)
{
@@ -244,17 +228,4 @@ static inline void pcs_msg_io_fini(struct pcs_msg * msg)
BUG_ON(msg->_iocount != 0);
}
-
-struct bufqueue;
-
-/**
- Present a portion of @bq as a pcs_msg that may be passed to pcs_sock_sendmsg().
- Reading data from the pcs_msg will drain @bq.
-
- \param @bq the buffer queue with the data of a message
- \param @size the length of the head of @bq that will be presented as a pcs_msg
- \returns a pcs_msg that reads data from @bq
-*/
-struct pcs_msg* bufqueue_as_pcs_output_msg(struct bufqueue *bq, u32 size);
-
#endif /* _PCS_SOCK_IO_H_ */
More information about the Devel
mailing list