[Devel] [PATCH VZ9 v2 2/2] fs/fuse kio: introduce pcs_krpc - export kernel RPC to userspace
Liu Kui
kui.liu at virtuozzo.com
Thu May 23 13:55:02 MSK 2024
Currently there are 2 connections for every RPC, one in userspace,
one in kernel. This wastes a lot of resources on client hosts in case
of a huge cluster. It's therefore desirable to eliminate connections
in userspace by using kernel RPC directly for userspace RPC.
This patch makes the pcs_fuse_kio module provide such functionality for
userspace to use. Major changes are:
- Introduce a new struct pcs_krpc, which links userspace RPC connection
directly to corresponding kernel RPC. It also provides execution context
for forwarding RPC messages from/to kernel RPC.
- Added several new cmds to the FUSE_IOC_KIO_CALL ioctl for pcs_krpc
management.
- Kernel RPC ops is changed to demux msgs to pcs_cs or pcs_krpc
https://pmc.acronis.work/browse/VSTOR-82613
Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
---
fs/fuse/Makefile | 6 +-
fs/fuse/dev.c | 19 +-
fs/fuse/fuse_i.h | 2 +-
fs/fuse/kio/pcs/pcs_cluster_core.c | 4 +
fs/fuse/kio/pcs/pcs_cs.c | 120 +----
fs/fuse/kio/pcs/pcs_cs.h | 5 +-
fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 96 +++-
fs/fuse/kio/pcs/pcs_ioctl.h | 29 ++
fs/fuse/kio/pcs/pcs_krpc.c | 794 +++++++++++++++++++++++++++++
fs/fuse/kio/pcs/pcs_krpc.h | 140 +++++
fs/fuse/kio/pcs/pcs_krpc_prot.h | 44 ++
fs/fuse/kio/pcs/pcs_mr.c | 4 +-
fs/fuse/kio/pcs/pcs_req.h | 30 +-
fs/fuse/kio/pcs/pcs_rpc.c | 3 +-
fs/fuse/kio/pcs/pcs_rpc.h | 4 +-
fs/fuse/kio/pcs/pcs_rpc_clnt.c | 188 +++++++
fs/fuse/kio/pcs/pcs_rpc_clnt.h | 16 +
17 files changed, 1386 insertions(+), 118 deletions(-)
create mode 100644 fs/fuse/kio/pcs/pcs_krpc.c
create mode 100644 fs/fuse/kio/pcs/pcs_krpc.h
create mode 100644 fs/fuse/kio/pcs/pcs_krpc_prot.h
create mode 100644 fs/fuse/kio/pcs/pcs_rpc_clnt.c
create mode 100644 fs/fuse/kio/pcs/pcs_rpc_clnt.h
diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index 740c805adc2a..18eaa35a234b 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -33,7 +33,11 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
kio/pcs/pcs_rdma_rw.o \
kio/pcs/pcs_rdma_conn.o \
kio/pcs/pcs_net_addr.o \
- kio/pcs/pcs_cs_accel.o
+ kio/pcs/pcs_cs_accel.o \
+ kio/pcs/pcs_rpc_clnt.o \
+ kio/pcs/pcs_mr.o \
+ kio/pcs/pcs_krpc.o
+
fuse_kio_pcs_trace-objs := kio/pcs/fuse_kio_pcs_trace.o
virtiofs-y := virtio_fs.o
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index e00f5b180e95..bf54adf2bca9 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2657,11 +2657,20 @@ static long fuse_dev_ioctl(struct file *file, unsigned int cmd,
if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
return -EFAULT;
- op = fuse_kio_get(NULL, req.name);
- if (op == NULL)
- return -EINVAL;
- res = op->ioctl(NULL, NULL, req.cmd, req.data, req.len);
- fuse_kio_put(op);
+
+ fud = fuse_get_dev(file);
+ if (fud) {
+ op = fud->fc->kio.op;
+ if (op == NULL)
+ return -EINVAL;
+ res = op->dev_ioctl(fud->fc, req.cmd, req.data, req.len);
+ } else {
+ op = fuse_kio_get(NULL, req.name);
+ if (op == NULL)
+ return -EINVAL;
+ res = op->ioctl(NULL, NULL, req.cmd, req.data, req.len);
+ fuse_kio_put(op);
+ }
break;
}
default:
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index 0c1bd5209dbc..090871a1e356 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -649,7 +649,7 @@ struct fuse_kio_ops {
void (*inode_release)(struct fuse_inode *fi);
void (*kill_requests)(struct fuse_conn *fc, struct inode *inode);
int (*ioctl)(struct file *file, struct inode *inode, unsigned int cmd, unsigned long arg, int len);
-
+ int (*dev_ioctl)(struct fuse_conn *fc, unsigned int cmd, unsigned long arg, int len);
};
int fuse_register_kio(struct fuse_kio_ops *ops);
void fuse_unregister_kio(struct fuse_kio_ops *ops);
diff --git a/fs/fuse/kio/pcs/pcs_cluster_core.c b/fs/fuse/kio/pcs/pcs_cluster_core.c
index 8c34c60eb79a..6df2ccee9a1f 100644
--- a/fs/fuse/kio/pcs/pcs_cluster_core.c
+++ b/fs/fuse/kio/pcs/pcs_cluster_core.c
@@ -162,6 +162,8 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
cc->nilbuffer_kv.iov_len = sizeof(cc->nilbuffer);
pcs_csset_init(&cc->css);
+ pcs_mrset_init(&cc->mrs);
+ pcs_krpcset_init(&cc->krpcs);
err = pcs_mapset_init(cc);
if (err)
@@ -210,6 +212,8 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
void pcs_cc_fini(struct pcs_cluster_core *cc)
{
+ pcs_krpcset_fini(&cc->krpcs);
+ pcs_mrset_fini(&cc->mrs);
pcs_csset_fini(&cc->css);
pcs_mapset_fini(&cc->maps);
pcs_rpc_engine_fini(&cc->eng);
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 6cc97aa3f9d0..a48c3fbfa5b7 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -25,7 +25,6 @@
#include "log.h"
#include "fuse_ktrace.h"
#include "pcs_net_addr.h"
-
/* Lock order: cs->lock -> css->lock (lru, hash, bl_list) */
@@ -39,22 +38,9 @@ struct pcs_rpc_params cn_rpc_params = {
.flags = 0,
};
-static void cs_aborting(struct pcs_rpc *ep, int error);
-static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
-static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg);
-static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
-static void cs_connect(struct pcs_rpc *ep);
static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose);
static void pcs_cs_destroy(struct pcs_cs *cs);
-struct pcs_rpc_ops cn_rpc_ops = {
- .demux_request = cs_input,
- .get_hdr = cs_get_hdr,
- .state_change = cs_aborting,
- .keep_waiting = cs_keep_waiting,
- .connect = cs_connect,
-};
-
static int pcs_cs_percpu_stat_alloc(struct pcs_cs *cs)
{
seqlock_init(&cs->stat.seqlock);
@@ -124,8 +110,7 @@ static void pcs_cs_percpu_stat_free(struct pcs_cs *cs)
free_percpu(cs->stat.iolat);
}
-struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css,
- struct pcs_cluster_core *cc)
+struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css)
{
struct pcs_cs *cs;
@@ -153,13 +138,6 @@ struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css,
kfree(cs);
return NULL;
}
- cs->rpc = pcs_rpc_create(&cc->eng, &cn_rpc_params, &cn_rpc_ops);
- if (cs->rpc == NULL) {
- pcs_cs_percpu_stat_free(cs);
- kfree(cs);
- return NULL;
- }
- cs->rpc->private = cs;
INIT_LIST_HEAD(&cs->map_list);
return cs;
}
@@ -222,6 +200,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
/* If rpc is connected, leave it connected until failure.
* After current connect fails, reconnect will be done to new address
*/
+ struct pcs_rpc *rpc = cs->rpc;
if (addr) {
if (addr->type != PCS_ADDRTYPE_NONE) {
if (pcs_netaddr_cmp(&cs->addr, addr)) {
@@ -231,7 +210,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
FUSE_KTRACE(cc_from_csset(csset)->fc,
"Port change CS" NODE_FMT " seq=%d",
NODE_ARGS(*id), cs->addr_serno);
- pcs_rpc_set_address(cs->rpc, addr);
+ pcs_rpc_set_address(rpc, addr);
if (!(flags & CS_FL_INACTIVE)) {
pcs_map_notify_addr_change(cs);
@@ -246,30 +225,31 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
}
}
if (flags & CS_FL_LOCAL_SOCK)
- cs->rpc->flags |= PCS_RPC_F_LOCAL;
+ rpc->flags |= PCS_RPC_F_LOCAL;
else
- cs->rpc->flags &= ~PCS_RPC_F_LOCAL;
+ rpc->flags &= ~PCS_RPC_F_LOCAL;
return cs;
}
BUG_ON(addr == NULL);
- cs = pcs_cs_alloc(csset, cc_from_csset(csset));
+ cs = pcs_cs_alloc(csset);
if (!cs)
return NULL;
+ cs->rpc = pcs_rpc_clnt_create(&cc_from_csset(csset)->eng, id, addr, flags);
+
+ if (!cs->rpc) {
+ pcs_cs_percpu_stat_free(cs);
+ kfree(cs);
+ return NULL;
+ }
+ cs->rpc->clnt_cs = cs;
+
cs->id = *id;
cs->addr = *addr;
cs->addr_serno = 1;
- pcs_rpc_set_peer_id(cs->rpc, id, PCS_NODE_ROLE_CS);
- pcs_rpc_set_address(cs->rpc, addr);
-
- if (flags & CS_FL_LOCAL_SOCK)
- cs->rpc->flags |= PCS_RPC_F_LOCAL;
- else
- cs->rpc->flags &= ~PCS_RPC_F_LOCAL;
-
spin_lock(&cs->lock);
spin_lock(&csset->lock);
if (__lookup_cs(csset, id)) {
@@ -455,46 +435,7 @@ static void cs_get_read_response_iter(struct pcs_msg *msg, int offset, struct io
}
}
-static void cs_connect(struct pcs_rpc *ep)
-{
- void (*connect_start)(struct pcs_rpc *);
-
- if (ep->flags & PCS_RPC_F_LOCAL) {
- char path[128];
-
- snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT,
- (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id));
-
- if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) {
- TRACE("Path to local socket is too long: %s", path);
-
- ep->flags &= ~PCS_RPC_F_LOCAL;
- goto fail;
- }
- memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un));
- ep->sh.sun.sun_family = AF_UNIX;
- ep->sh.sa_len = sizeof(struct sockaddr_un);
- strcpy(ep->sh.sun.sun_path, path);
- connect_start = pcs_sockconnect_start;
- } else {
- /* TODO: print sock addr using pcs_format_netaddr() */
- if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
- TRACE("netaddr to sockaddr failed");
- goto fail;
- }
- connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ?
- pcs_rdmaconnect_start : pcs_sockconnect_start;
- }
- ep->state = PCS_RPC_CONNECT;
- connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
- return;
-fail:
- pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR);
- pcs_rpc_reset(ep);
- return;
-}
-
-static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
+struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
{
struct pcs_msg *msg, *resp;
struct pcs_rpc_hdr *req_h;
@@ -887,7 +828,7 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
do_cs_submit(cs, ireq);
}
-static void handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
+void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
{
struct pcs_cs *who;
@@ -945,10 +886,10 @@ static int may_reroute(struct pcs_cs_list *csl, PCS_NODE_ID_T cs_id)
return legit;
}
-static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg)
+void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg)
{
struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg_inline_head(msg);
- struct pcs_cs *cs = ep->private;
+ struct pcs_cs *cs = ep->clnt_cs;
struct pcs_cs *who;
/* Some CS reported it cannot complete local IO in time, close congestion window */
@@ -1003,21 +944,6 @@ static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_
}
-static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg)
-{
- struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer;
-
- switch (h->type) {
- case PCS_CS_CONG_NOTIFY:
- handle_congestion(ep->private, h);
- msg->done(msg);
- return 0;
- default:
- FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported message type %u", h->type);
- return PCS_ERR_PROTOCOL;
- }
-}
-
void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err)
{
struct list_head queue;
@@ -1094,17 +1020,13 @@ static void pcs_cs_destroy(struct pcs_cs *cs)
BUG_ON(cs->csa_ctx);
if (cs->rpc) {
- pcs_rpc_close(cs->rpc);
+ cs->rpc->clnt_cs = NULL;
+ pcs_rpc_clnt_close(cs->rpc);
cs->rpc = NULL;
}
call_rcu(&cs->rcu, cs_destroy_rcu);
}
-void cs_aborting(struct pcs_rpc *ep, int error)
-{
- pcs_rpc_reset(ep);
-}
-
/* Latency is difficult value to use for any decisions.
* It is sampled at random, we do not know what is happening while
* we have no samples. For now we do the following: arriving samples
diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
index 61be99a54157..62b88f612b54 100644
--- a/fs/fuse/kio/pcs/pcs_cs.h
+++ b/fs/fuse/kio/pcs/pcs_cs.h
@@ -159,8 +159,6 @@ unsigned int cs_get_avg_in_flight(struct pcs_cs *cs);
void pcs_csset_init(struct pcs_cs_set *css);
void pcs_csset_fini(struct pcs_cs_set *css);
-struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, struct pcs_cluster_core *cc);
-
void cs_log_io_times(struct pcs_int_request *ireq, struct pcs_msg *resp, unsigned int max_iolat);
int pcs_cs_format_io_times(char *buf, int buflen, struct pcs_int_request *ireq, struct pcs_msg *resp);
void cs_set_io_times_logger(void (*logger)(struct pcs_int_request *ireq, struct pcs_msg *resp, u32 max_iolat, void *ctx), void *ctx);
@@ -219,4 +217,7 @@ int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx);
void pcs_csa_relay_iotimes(struct pcs_int_request * ireq, struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id);
void pcs_csa_cs_detach(struct pcs_cs * cs);
+void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h);
+struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
+void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
#endif /* _PCS_CS_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 566dcb5f2f4c..a71cb7a9a0b6 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -37,6 +37,8 @@
#include "fuse_ktrace.h"
#include "fuse_prometheus.h"
#include "pcs_net_addr.h"
+#include "pcs_mr.h"
+#include "pcs_krpc.h"
unsigned int pcs_loglevel = LOG_TRACE;
module_param(pcs_loglevel, uint, 0644);
@@ -71,6 +73,10 @@ unsigned int rdmaio_queue_depth = 8;
module_param(rdmaio_queue_depth, uint, 0644);
MODULE_PARM_DESC(rdmaio_queue_depth, "RDMA queue depth");
+bool pcs_krpc_support = true;
+module_param(pcs_krpc_support, bool, 0444);
+MODULE_PARM_DESC(pcs_krpc_support, "krpc support");
+
void (*fuse_printk_plugin)(unsigned long, const char *, ...);
EXPORT_SYMBOL(fuse_printk_plugin);
@@ -189,7 +195,7 @@ static void process_pcs_init_reply(struct fuse_mount *fm, struct fuse_args *args
fuse_ktrace_setup(fc);
fc->ktrace_level = LOG_TRACE;
- printk("FUSE: kio_pcs: cl: " CLUSTER_ID_FMT ", clientid: " NODE_FMT "\n",
+ printk("FUSE: kio_pcs: cl: " CLUSTER_ID_FMT ", clientid: " NODE_FMT,
CLUSTER_ID_ARGS(info->cluster_id), NODE_ARGS(info->node_id));
spin_lock(&fc->lock);
@@ -1864,6 +1870,84 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd,
return res;
}
+static int kpcs_dev_ioctl(struct fuse_conn *fc, unsigned int cmd, unsigned long arg, int len)
+{
+ struct pcs_fuse_cluster *pfc = fc->kio.ctx;
+ struct pcs_cluster_core *cc = &pfc->cc;
+ int res;
+
+ switch (cmd) {
+ case PCS_IOC_KRPC_CREATE:
+ {
+ struct pcs_ioc_krpc_create req;
+
+ if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+ return -EFAULT;
+
+ if (pcs_krpc_lookup(&cc->krpcs, &req.id))
+ return -EEXIST;
+
+ res = pcs_krpc_create(&cc->krpcs, &req.id, &req.addr, req.cs_flags);
+ break;
+ }
+ case PCS_IOC_KRPC_UPDATE_ADDR:
+ {
+ struct pcs_ioc_krpc_create req;
+
+ if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+ return -EFAULT;
+
+ res = pcs_krpc_update_addr(&cc->krpcs, &req.id, &req.addr, req.cs_flags);
+ break;
+ }
+
+ case PCS_IOC_KRPC_CONNECT:
+ {
+ struct pcs_ioc_krpc_connect req;
+
+ if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+ return -EFAULT;
+
+ res = pcs_krpc_connect(&cc->krpcs, &req.id);
+ break;
+ }
+ case PCS_IOC_KRPC_DESTROY:
+ {
+ struct pcs_ioc_krpc_destroy req;
+
+ if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+ return -EFAULT;
+
+ res = pcs_krpc_destroy(&cc->krpcs, &req.id);
+ break;
+ }
+ case PCS_IOC_REG_MR:
+ {
+ struct pcs_ioc_reg_mr req;
+
+ if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+ return -EFAULT;
+
+ res = pcs_reg_mr(&cc->mrs, req.start, req.len);
+ break;
+ }
+ case PCS_IOC_DEREG_MR:
+ {
+ struct pcs_ioc_dereg_mr req;
+
+ if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+ return -EFAULT;
+
+ res = pcs_dereg_mr(&cc->mrs, req.id);
+ break;
+ }
+ default:
+ res = -ENOIOCTLCMD;
+ break;
+ }
+ return res;
+}
+
static struct fuse_kio_ops kio_pcs_ops = {
.name = "pcs",
.owner = THIS_MODULE,
@@ -1880,6 +1964,7 @@ static struct fuse_kio_ops kio_pcs_ops = {
.inode_release = kpcs_inode_release,
.kill_requests = kpcs_kill_requests,
.ioctl = kpcs_ioctl,
+ .dev_ioctl = kpcs_dev_ioctl,
};
@@ -1920,10 +2005,13 @@ static int __init kpcs_mod_init(void)
if (pcs_csa_init())
goto free_cleanup_wq;
+ if (pcs_krpc_init())
+ goto free_csa;
+
fast_path_version = PCS_FAST_PATH_VERSION.full;
if (fuse_register_kio(&kio_pcs_ops))
- goto free_csa;
+ goto free_krpc;
/* Clone relay_file_operations to set ownership */
ktrace_file_operations = relay_file_operations;
@@ -1935,10 +2023,13 @@ static int __init kpcs_mod_init(void)
if (IS_ERR(crc_tfm))
crc_tfm = NULL;
+
printk("%s fuse_c:%p ireq_c:%p pcs_wq:%p\n", __FUNCTION__,
pcs_fuse_req_cachep, pcs_ireq_cachep, pcs_wq);
return 0;
+free_krpc:
+ pcs_krpc_fini();
free_csa:
pcs_csa_fini();
free_cleanup_wq:
@@ -1968,6 +2059,7 @@ static void __exit kpcs_mod_exit(void)
kmem_cache_destroy(pcs_ireq_cachep);
kmem_cache_destroy(pcs_fuse_req_cachep);
pcs_csa_fini();
+ pcs_krpc_fini();
}
module_init(kpcs_mod_init);
diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
index 8e55be02c654..3c000373745b 100644
--- a/fs/fuse/kio/pcs/pcs_ioctl.h
+++ b/fs/fuse/kio/pcs/pcs_ioctl.h
@@ -123,4 +123,33 @@ struct pcs_csa_setmap
#define PCS_CSA_IOC_SETMAP _IOR('V',38, struct pcs_csa_setmap)
#define PCS_KIO_CALL_REG _IOR('V',39, struct fuse_pcs_ioc_register)
+struct pcs_ioc_reg_mr {
+ u64 start;
+ u64 len;
+};
+#define PCS_IOC_REG_MR _IOR('V', 40, struct pcs_ioc_reg_mr)
+
+struct pcs_ioc_dereg_mr {
+ u32 id;
+};
+#define PCS_IOC_DEREG_MR _IOR('V', 41, struct pcs_ioc_dereg_mr)
+
+struct pcs_ioc_krpc_create {
+ PCS_NODE_ID_T id;
+ PCS_NET_ADDR_T addr;
+ int cs_flags;
+};
+#define PCS_IOC_KRPC_CREATE _IOR('V', 42, struct pcs_ioc_krpc_create)
+#define PCS_IOC_KRPC_UPDATE_ADDR _IOR('V', 43, struct pcs_ioc_krpc_create)
+
+struct pcs_ioc_krpc_connect {
+ PCS_NODE_ID_T id;
+};
+#define PCS_IOC_KRPC_CONNECT _IOR('V', 44, struct pcs_ioc_krpc_connect)
+
+struct pcs_ioc_krpc_destroy {
+ PCS_NODE_ID_T id;
+};
+#define PCS_IOC_KRPC_DESTROY _IOR('V', 45, struct pcs_ioc_krpc_destroy)
+
#endif /* _PCS_IOCTL_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
new file mode 100644
index 000000000000..0e49dc227dca
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -0,0 +1,794 @@
+/*
+ * fs/fuse/kio/pcs/pcs_krpc.c
+ *
+ * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/rbtree.h>
+#include <linux/refcount.h>
+#include <linux/file.h>
+#include <linux/anon_inodes.h>
+
+#include "pcs_types.h"
+#include "pcs_cluster.h"
+#include "pcs_cs_prot.h"
+#include "pcs_req.h"
+#include "pcs_krpc.h"
+
+struct kmem_cache *krpc_req_cachep;
+
+static int krpc_completion_post(struct pcs_krpc *krpc, struct krpc_completion *comp)
+__releases(krpc->lock)
+{
+ /* post complete only when connected */
+ if (krpc->state != PCS_KRPC_STATE_CONNECTED) {
+ spin_unlock(&krpc->lock);
+ return -1;
+ }
+ list_add_tail(&comp->link, &krpc->completion_queue);
+ krpc->nr_completion++;
+ spin_unlock(&krpc->lock);
+
+ wake_up_poll(&krpc->poll_wait, EPOLLIN);
+ return 0;
+}
+
+static struct krpc_req *krpc_req_alloc(void)
+{
+ struct krpc_req *kreq;
+
+ kreq = kmem_cache_alloc(krpc_req_cachep, GFP_NOIO);
+ if (!kreq)
+ return NULL;
+
+ spin_lock_init(&kreq->lock);
+ kreq->flags = 0;
+ memset(&kreq->completion, 0, sizeof(struct krpc_completion));
+
+ return kreq;
+}
+
+static void krpc_req_free(struct krpc_req *kreq)
+{
+ kmem_cache_free(krpc_req_cachep, kreq);
+}
+
+static void krpc_completion_free(struct krpc_completion *comp)
+{
+ if (comp->private)
+ krpc_req_free((struct krpc_req *)comp->private);
+ else
+ kfree(comp);
+}
+
+static void krpc_req_complete(struct krpc_req *kreq, int error)
+{
+ struct krpc_completion *comp = &kreq->completion;
+ struct pcs_krpc *krpc = kreq->krpc;
+ int i;
+
+ BUG_ON(!comp->xid);
+
+ comp->result = error;
+
+ /* kunmap(kreq->hdr_buf); */
+ pcs_mr_put(kreq->hdr_chunk.mr);
+
+ for (i = 0; i < kreq->nr_data_chunks; i++) {
+ struct krpc_chunk *chunk;
+
+ chunk = &kreq->data_chunks[i];
+ if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
+ pcs_umem_release(chunk->umem);
+ else
+ pcs_mr_put(chunk->mr);
+ }
+
+ if (kreq->data_chunks != &kreq->inline_data_chunks[0])
+ kfree(kreq->data_chunks);
+
+ spin_lock(&krpc->lock);
+ list_del(&kreq->link);
+
+ if (kreq->flags & KRPC_REQ_F_ABORTED) {
+ krpc_req_free(kreq);
+ spin_unlock(&krpc->lock);
+ } else if (krpc_completion_post(krpc, comp))
+ krpc_req_free(kreq);
+
+ pcs_krpc_put(krpc);
+}
+
+static void krpc_msg_get_response_iter(struct pcs_msg *msg, int offset,
+ struct iov_iter *it, unsigned int direction)
+{
+ struct pcs_msg *req = msg->private;
+ struct krpc_req *kreq = req->private2;
+
+ /* No data payload*/
+ if (!(kreq->flags & KRPC_REQ_F_RESP_BUFF)) {
+ /* No data payload */
+ BUG_ON(msg->size > PAGE_SIZE);
+
+ kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf;
+ kreq->hdr_kv.iov_len = msg->size;
+
+ iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, msg->size);
+ iov_iter_advance(it, offset);
+ } else {
+ /* With Data payload */
+ int hdr_size = sizeof(struct pcs_cs_iohdr);
+
+ if (offset < hdr_size) {
+ kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf;
+ kreq->hdr_kv.iov_len = hdr_size;
+ iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, hdr_size);
+ iov_iter_advance(it, offset);
+ } else {
+ //offset -= (unsigned int) sizeof(struct pcs_cs_iohdr);
+ BUG_ON((offset - hdr_size) > kreq->data_len);
+ BUG_ON(msg->size - hdr_size > kreq->data_len);
+
+ iov_iter_bvec(it, direction, &kreq->data_bvecs[0],
+ kreq->nr_data_bvecs, kreq->data_len);
+ iov_iter_truncate(it, msg->size - hdr_size);
+ iov_iter_advance(it, offset - hdr_size);
+ }
+ }
+}
+
+struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
+{
+ struct pcs_msg *msg, *resp;
+ struct pcs_rpc_hdr *req_h;
+ struct krpc_req *kreq;
+
+ if (!RPC_IS_RESPONSE(h->type))
+ return NULL;
+
+ msg = pcs_rpc_lookup_xid(ep, &h->xid);
+ if (msg == NULL)
+ return NULL;
+
+ req_h = (struct pcs_rpc_hdr *)msg_inline_head(msg);
+ if (req_h->type != (h->type & ~PCS_RPC_DIRECTION))
+ return NULL;
+
+ kreq = msg->private2;
+
+ resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr));
+ if (!resp)
+ return NULL;
+
+ memcpy(resp->_inline_buffer, h, sizeof(struct pcs_rpc_hdr));
+ memcpy(kreq->hdr_buf, h, sizeof(struct pcs_rpc_hdr));
+ resp->size = h->len;
+ resp->private = msg;
+ resp->get_iter = krpc_msg_get_response_iter;
+ resp->done = rpc_work_input;
+ pcs_msg_del_calendar(msg);
+
+ return resp;
+}
+
+/*
+ * All request type message from cs are forwarded to userspace directly. The PCS KRPC
+ * layer cannot process these request on its own.
+ */
+static void krpc_handle_request(struct pcs_krpc *krpc, struct pcs_msg *msg)
+{
+ struct krpc_completion *comp;
+
+ if (krpc->state != PCS_KRPC_STATE_CONNECTED)
+ return;
+
+ comp = kzalloc(sizeof(*comp) + msg->size, GFP_NOIO);
+ if (!comp) {
+ /* Error ?*/
+ return;
+ }
+
+ memcpy(comp->_data_buf, msg->_inline_buffer, msg->size);
+ comp->data_len = msg->size;
+
+ spin_lock(&krpc->lock);
+ if (krpc_completion_post(krpc, comp))
+ kfree(comp);
+}
+
+/*
+ * Let userspace to handle cs_congestion_notify request.
+ */
+void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg)
+{
+ krpc_handle_request(ep->clnt_krpc, msg);
+}
+
+/*
+ * Let userspace to handle the keep_waiting request.
+ */
+void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg)
+{
+ krpc_handle_request(ep->clnt_krpc, msg);
+}
+
+unsigned int pcs_krpc_hash(PCS_NODE_ID_T *id)
+{
+ return *(unsigned int *)id % PCS_KRPC_HASH_SIZE;
+}
+
+static void krpc_msg_get_data(struct pcs_msg *msg, int offset,
+ struct iov_iter *it, unsigned int direction)
+{
+ struct krpc_req *kreq = msg->private2;
+ int hdr_size = pcs_krpc_msg_size(kreq->hdr_chunk.len, kreq->flags);
+ int data_size = pcs_krpc_msg_size(kreq->data_len, kreq->flags);
+ struct pcs_krpc *krpc = kreq->krpc;
+
+ if (offset < kreq->hdr_chunk.len) {
+ iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, kreq->hdr_chunk.len);
+ iov_iter_advance(it, offset);
+ return;
+ }
+
+ if (offset < hdr_size) {
+ iov_iter_kvec(it, direction, &(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv,
+ 1, hdr_size - offset);
+ return;
+ }
+
+ if (offset < hdr_size + kreq->data_len) {
+ iov_iter_bvec(it, direction, &kreq->data_bvecs[0],
+ kreq->nr_data_bvecs, kreq->data_len);
+ iov_iter_advance(it, offset - hdr_size);
+ return;
+ }
+
+ BUG_ON(offset >= hdr_size + data_size);
+
+ iov_iter_kvec(it, direction, &(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv,
+ 1, (hdr_size + data_size - offset));
+}
+
+static void pcs_krpc_response_done(struct pcs_msg *msg)
+{
+ struct krpc_req *kreq = msg->private2;
+
+ if (msg->rpc) {
+ pcs_rpc_put(msg->rpc);
+ msg->rpc = NULL;
+ }
+
+ krpc_req_complete(kreq, msg->error.value);
+}
+
+static void pcs_krpc_msg_sent(struct pcs_msg *msg)
+{
+ msg->done = pcs_krpc_response_done;
+ if (pcs_if_error(&msg->error)) {
+ msg->done(msg);
+ return;
+ }
+ pcs_rpc_sent(msg);
+}
+
+static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_recvmsg *iocmsg)
+{
+ struct krpc_completion *comp;
+ int res = 0;
+
+ spin_lock(&krpc->lock);
+ if (list_empty(&krpc->completion_queue)) {
+ spin_unlock(&krpc->lock);
+ return 0;
+ }
+
+ comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link);
+ list_del(&comp->link);
+ krpc->nr_completion--;
+ spin_unlock(&krpc->lock);
+
+ if (comp->result) {
+ res -= comp->result;
+ goto out;
+ }
+
+ res = 1;
+ iocmsg->xid = comp->xid;
+ if (comp->xid == 0) {
+ BUG_ON(!comp->data_len);
+ BUG_ON(iocmsg->buf.len < comp->data_len);
+ if (copy_to_user((void __user *)iocmsg->buf.addr, comp->_data_buf, comp->data_len))
+ res = -EFAULT;
+ }
+
+out:
+ krpc_completion_free(comp);
+ return res;
+}
+
+static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_sendmsg *iocmsg)
+{
+ struct krpc_req *kreq;
+ struct pcs_msg *msg;
+ struct pcs_krpc_buf_desc *chunk_bd;
+ struct krpc_chunk *chunk;
+ int res, i;
+ struct bio_vec *bvec;
+
+ kreq = krpc_req_alloc();
+ if (!kreq)
+ return -ENOMEM;
+
+ if (iocmsg->nr_data_chunks > NR_KRPC_DATA_CHUNKS_INLINE) {
+ kreq->data_chunks = kzalloc(iocmsg->nr_data_chunks, GFP_NOIO);
+ if (!kreq->data_chunks) {
+ res = -ENOMEM;
+ goto err_free_kreq;
+ }
+ } else
+ kreq->data_chunks = &kreq->inline_data_chunks[0];
+
+ /*
+ * Header buff is exactly one page, used for staging message header. Will be used for
+ * receiving header of response message as well.
+ */
+ BUG_ON(iocmsg->hdr_chunk.addr & (PAGE_SIZE - 1));
+
+ chunk_bd = &iocmsg->hdr_chunk;
+ chunk = &kreq->hdr_chunk;
+
+ chunk->addr = chunk_bd->addr;
+ chunk->len = chunk_bd->len;
+ chunk->type = KRPC_CHUNK_TYPE_MR;
+
+ chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, iocmsg->hdr_chunk.mr_id);
+ BUG_ON(!chunk->mr);
+
+ kreq->hdr_buf = (char *) kmap(pcs_umem_page(chunk->mr->umem, chunk->addr));
+ kreq->hdr_kv.iov_base = kreq->hdr_buf;
+ kreq->hdr_kv.iov_len = chunk->len;
+
+ /* data chunk buf descriptors are placed at end of header buf, grow backwards */
+ kreq->data_len = 0;
+ kreq->nr_data_chunks = 0;
+ kreq->nr_data_bvecs = 0;
+
+ chunk_bd = (struct pcs_krpc_buf_desc *)(kreq->hdr_buf + PAGE_SIZE) - 1;
+ chunk = kreq->data_chunks;
+ bvec = &kreq->data_bvecs[0];
+
+ for (i = 0; i < iocmsg->nr_data_chunks; i++) {
+ struct page *page;
+ u64 addr;
+ int offset, len;
+
+ chunk->addr = chunk_bd->addr;
+ chunk->len = chunk_bd->len;
+ if (chunk_bd->mr_id) {
+ chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, chunk_bd->mr_id);
+ chunk->type = KRPC_CHUNK_TYPE_MR;
+ if (!chunk->mr) {
+ res = -ENXIO;
+ goto err_free_data_chunk;
+ }
+ } else {
+ /* unregistered memory buf */
+ chunk->umem = pcs_umem_get(chunk->addr, chunk->len);
+ if (IS_ERR(chunk->umem)) {
+ res = PTR_ERR(chunk->umem);
+ goto err_free_data_chunk;
+ }
+
+ chunk->type = KRPC_CHUNK_TYPE_UMEM;
+ }
+
+ /* build the bvecs for the data chunk*/
+ addr = chunk->addr;
+ len = chunk->len;
+
+ while (len) {
+ /* data bvec array overflow? */
+ BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
+
+ if (chunk->type == KRPC_CHUNK_TYPE_MR)
+ page = pcs_umem_page(chunk->mr->umem, addr);
+ else
+ page = pcs_umem_page(chunk->umem, addr);
+
+ BUG_ON(!page);
+
+ offset = addr & (PAGE_SIZE - 1);
+
+ bvec->bv_page = page;
+ bvec->bv_offset = offset;
+
+ bvec->bv_len = len < (PAGE_SIZE - offset) ? len : (PAGE_SIZE - offset);
+
+ addr += bvec->bv_len;
+ len -= bvec->bv_len;
+ bvec++;
+ kreq->nr_data_bvecs++;
+ }
+ BUG_ON(len != 0);
+
+ kreq->data_len += chunk->len;
+ chunk++;
+ chunk_bd--;
+ kreq->nr_data_chunks++;
+ }
+
+ kreq->completion.xid = iocmsg->xid;
+ kreq->completion.private = kreq;
+
+ kreq->flags = iocmsg->flags;
+
+ msg = &kreq->msg;
+ msg->private = krpc;
+ msg->private2 = kreq;
+
+ INIT_HLIST_NODE(&msg->kill_link);
+ pcs_clear_error(&msg->error);
+
+ msg->size = iocmsg->msg_size;
+ msg->timeout = iocmsg->timeout;
+
+ msg->rpc = NULL;
+ msg->done = pcs_krpc_msg_sent;
+ msg->get_iter = krpc_msg_get_data;
+
+ spin_lock(&krpc->lock);
+ kreq->krpc = pcs_krpc_get(krpc);
+ list_add_tail(&kreq->link, &krpc->pending_queue);
+ spin_unlock(&krpc->lock);
+ /* DTRACE to be added */
+ pcs_rpc_queue(krpc->rpc, msg);
+
+ return 0;
+
+err_free_data_chunk:
+ for (i = 0; i < kreq->nr_data_chunks; i++) {
+ chunk = &kreq->data_chunks[i];
+ if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
+ pcs_umem_release(chunk->umem);
+ else
+ pcs_mr_put(chunk->mr);
+ }
+ pcs_mr_put(kreq->hdr_chunk.mr);
+
+err_free_kreq:
+ if (kreq->data_chunks != &kreq->inline_data_chunks[0])
+ kfree(kreq->data_chunks);
+ krpc_req_free(kreq);
+ return res;
+}
+
+static int pcs_krpc_abort(struct pcs_krpc *krpc)
+{
+ struct list_head dispose_list;
+ struct krpc_req *kreq;
+ struct krpc_completion *comp;
+
+ INIT_LIST_HEAD(&dispose_list);
+
+ spin_lock(&krpc->lock);
+
+ if (krpc->state != PCS_KRPC_STATE_CONNECTED) {
+ spin_unlock(&krpc->lock);
+ return 0;
+ }
+
+ krpc->state = PCS_KRPC_STATE_ABORTED;
+
+ /* abort incompleted requests */
+ list_splice_tail_init(&krpc->pending_queue, &dispose_list);
+ while (!list_empty(&dispose_list)) {
+ kreq = list_first_entry(&dispose_list, struct krpc_req, link);
+ kreq->flags |= KRPC_REQ_F_ABORTED;
+ }
+
+ list_splice_tail_init(&krpc->completion_queue, &dispose_list);
+ krpc->nr_completion = 0;
+
+ /* dispose all unprocessed completions */
+ while (!list_empty(&dispose_list)) {
+ comp = list_first_entry(&dispose_list, struct krpc_completion, link);
+ list_del(&comp->link);
+ krpc_completion_free(comp);
+ }
+
+ spin_unlock(&krpc->lock);
+
+ return 0;
+}
+
+static long pcs_krpc_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+{
+ struct pcs_krpc *krpc = file->private_data;
+ int res = 0;
+
+ switch (cmd) {
+ case PCS_KRPC_IOC_SEND_MSG: {
+ struct pcs_krpc_ioc_sendmsg req;
+
+ if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
+ return -EFAULT;
+
+ res = pcs_krpc_ioctl_send_msg(krpc, &req);
+ break;
+ }
+ case PCS_KRPC_IOC_RECV_MSG: {
+ struct pcs_krpc_ioc_recvmsg req;
+
+ res = pcs_krpc_ioctl_recv_msg(krpc, &req);
+
+ if (copy_to_user((void __user *)arg, &req, sizeof(req)))
+ return -EFAULT;
+ break;
+ }
+ case PCS_KRPC_IOC_ABORT:
+ res = pcs_krpc_abort(krpc);
+ break;
+ default:
+ res = -ENOTTY;
+ break;
+ }
+
+ return res;
+}
+
+static __poll_t pcs_krpc_poll(struct file *file, poll_table *wait)
+{
+ struct pcs_krpc *krpc = file->private_data;
+ __poll_t pollflags = 0;
+
+ poll_wait(file, &krpc->poll_wait, wait);
+
+ spin_lock(&krpc->lock);
+ /* EPOLLERR? */
+ if (krpc->state == PCS_KRPC_STATE_ABORTED)
+ pollflags |= EPOLLERR;
+ else if (krpc->state == PCS_KRPC_STATE_CONNECTED) {
+ pollflags |= EPOLLOUT;
+ if (!list_empty(&krpc->completion_queue))
+ pollflags |= EPOLLIN;
+ }
+
+ spin_unlock(&krpc->lock);
+
+ return pollflags;
+}
+
+/*
+ * Reset to initial state -- PCS_KRPC_STATE_UNCONN
+ */
+static int pcs_krpc_release(struct inode *inode, struct file *file)
+{
+ struct pcs_krpc *krpc = file->private_data;
+
+ /* Just abort all pending requests and disconnect from the krpc */
+ pcs_krpc_abort(krpc);
+
+ spin_lock(&krpc->lock);
+ if (krpc->state == PCS_KRPC_STATE_ABORTED)
+ krpc->state = PCS_KRPC_STATE_UNCONN;
+ spin_unlock(&krpc->lock);
+
+ return 0;
+}
+
+static const struct file_operations pcs_krpc_fops = {
+ .owner = THIS_MODULE,
+ .release = pcs_krpc_release,
+ .poll = pcs_krpc_poll,
+ .unlocked_ioctl = pcs_krpc_ioctl,
+ .llseek = no_llseek,
+};
+
+struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
+{
+ struct pcs_krpc *krpc;
+
+ hlist_for_each_entry(krpc, &krpcs->ht[pcs_krpc_hash(id)], hlist) {
+ if (memcmp(&krpc->id, id, sizeof(krpc->id)) == 0)
+ return krpc;
+ }
+ return NULL;
+}
+
+struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc)
+{
+ refcount_inc(&krpc->refcnt);
+ return krpc;
+}
+
+void pcs_krpc_put(struct pcs_krpc *krpc)
+{
+ if (refcount_dec_and_test(&krpc->refcnt))
+ kfree(krpc);
+}
+
+/*
+ * Create a new pcs_krpc
+ */
+int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
+ PCS_NET_ADDR_T *addr, int cs_flags)
+{
+ struct pcs_krpc *krpc;
+
+ krpc = kzalloc(sizeof(struct pcs_krpc), GFP_NOIO);
+ if (!krpc)
+ return -ENOMEM;
+
+ INIT_HLIST_NODE(&krpc->hlist);
+ INIT_LIST_HEAD(&krpc->link);
+ spin_lock_init(&krpc->lock);
+ refcount_set(&krpc->refcnt, 1);
+ INIT_LIST_HEAD(&krpc->pending_queue);
+ INIT_LIST_HEAD(&krpc->completion_queue);
+ init_waitqueue_head(&krpc->poll_wait);
+
+ krpc->krpcs = krpcs;
+ krpc->id = *id;
+ krpc->state = PCS_KRPC_STATE_UNCONN;
+
+ krpc->rpc = pcs_rpc_clnt_create(&cc_from_krpcset(krpcs)->eng, id, addr, cs_flags);
+ if (!krpc->rpc) {
+ kfree(krpc);
+ return -ENOMEM;
+ }
+
+ krpc->rpc->clnt_krpc = krpc;
+
+ spin_lock(&krpc->lock);
+ spin_lock(&krpcs->lock);
+ hlist_add_head(&krpc->hlist, &krpcs->ht[pcs_krpc_hash(&krpc->id)]);
+ list_add_tail(&krpc->link, &krpcs->list);
+ krpcs->nkrpc++;
+ spin_unlock(&krpcs->lock);
+ spin_unlock(&krpc->lock);
+
+ return 0;
+}
+
+int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
+ PCS_NET_ADDR_T *addr, int flags)
+{
+ struct pcs_krpc *krpc;
+
+ krpc = pcs_krpc_lookup(krpcs, id);
+ if (!krpc)
+ return -ENXIO;
+
+ pcs_rpc_set_address(krpc->rpc, addr);
+
+ if (flags & CS_FL_LOCAL_SOCK)
+ krpc->rpc->flags |= PCS_RPC_F_LOCAL;
+ else
+ krpc->rpc->flags &= ~PCS_RPC_F_LOCAL;
+
+ return 0;
+}
+/*
+ * Connect to a pcs_krpc, return a valid fd on success.
+ */
+int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
+{
+ struct pcs_krpc *krpc;
+ int fd;
+ struct file *file;
+
+ krpc = pcs_krpc_lookup(krpcs, id);
+ if (!krpc)
+ return -ENXIO;
+
+ WARN_ON(krpc->state != PCS_KRPC_STATE_UNCONN);
+ if (krpc->state != PCS_KRPC_STATE_UNCONN)
+ return -EPERM;
+
+ fd = get_unused_fd_flags(O_CLOEXEC);
+ if (fd < 0)
+ return fd;
+
+ file = anon_inode_getfile("[pcs_krpc]", &pcs_krpc_fops, krpc, 0);
+ if (IS_ERR(file)) {
+ put_unused_fd(fd);
+ fd = PTR_ERR(file);
+ return fd;
+ }
+
+ fd_install(fd, file);
+
+ /*
+ * the krpc should always be connected regardless state of
+ * underlying RPC
+ */
+ krpc->state = PCS_KRPC_STATE_CONNECTED;
+ return fd;
+}
+
+static void __pcs_krpc_destroy(struct pcs_krpc *krpc)
+{
+ spin_lock(&krpc->lock);
+
+ krpc->state = PCS_KRPC_STATE_DESTROYED;
+
+ /*Remove from krpc set*/
+ spin_lock(&krpc->krpcs->lock);
+ hlist_del(&krpc->hlist);
+ list_del(&krpc->link);
+ krpc->krpcs->nkrpc--;
+ spin_unlock(&krpc->krpcs->lock);
+
+ spin_unlock(&krpc->lock);
+
+ if (krpc->rpc) {
+ krpc->rpc->clnt_krpc = NULL;
+ pcs_rpc_clnt_close(krpc->rpc);
+ krpc->rpc = NULL;
+ }
+ pcs_krpc_put(krpc);
+}
+
+int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
+{
+ struct pcs_krpc *krpc;
+
+ krpc = pcs_krpc_lookup(krpcs, id);
+ BUG_ON(!krpc);
+ BUG_ON(krpc->state != PCS_KRPC_STATE_UNCONN);
+
+ __pcs_krpc_destroy(krpc);
+ return 0;
+}
+
+void pcs_krpcset_init(struct pcs_krpc_set *krpcs)
+{
+ unsigned int i;
+
+ for (i = 0; i < PCS_KRPC_HASH_SIZE; i++)
+ INIT_HLIST_HEAD(&krpcs->ht[i]);
+
+ INIT_LIST_HEAD(&krpcs->list);
+ krpcs->nkrpc = 0;
+
+ spin_lock_init(&krpcs->lock);
+}
+
+void pcs_krpcset_fini(struct pcs_krpc_set *krpcs)
+{
+ spin_lock(&krpcs->lock);
+ while (!list_empty(&krpcs->list)) {
+ struct pcs_krpc *krpc;
+
+ krpc = list_first_entry(&krpcs->list, struct pcs_krpc, link);
+ spin_unlock(&krpcs->lock);
+ pcs_krpc_abort(krpc);
+ __pcs_krpc_destroy(krpc);
+ spin_lock(&krpcs->lock);
+ }
+ spin_unlock(&krpcs->lock);
+
+ BUG_ON(!list_empty(&krpcs->list));
+ BUG_ON(krpcs->nkrpc != 0);
+}
+
+int __init pcs_krpc_init(void)
+{
+ krpc_req_cachep = kmem_cache_create("pcs_krpc_req",
+ sizeof(struct krpc_req), 0,
+ SLAB_RECLAIM_ACCOUNT|SLAB_ACCOUNT, NULL);
+
+ if (!krpc_req_cachep)
+ return -ENOMEM;
+
+ return 0;
+}
+
+void pcs_krpc_fini(void)
+{
+ kmem_cache_destroy(krpc_req_cachep);
+}
diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h
new file mode 100644
index 000000000000..6a257ba80208
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_krpc.h
@@ -0,0 +1,140 @@
+/*
+ * fs/fuse/kio/pcs/pcs_krpc.h
+ *
+ * Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+#ifndef _PCS_KRPC_H_
+#define _PCS_KRPC_H_ 1
+#include <linux/types.h>
+#include <linux/bvec.h>
+
+#include "pcs_prot_types.h"
+#include "pcs_perfcounters.h"
+#include "pcs_rpc_clnt.h"
+#include "pcs_krpc_prot.h"
+#include "pcs_mr.h"
+
+struct krpc_chunk {
+ u64 addr;
+ u32 len;
+ u32 type;
+#define KRPC_CHUNK_TYPE_MR 0
+#define KRPC_CHUNK_TYPE_UMEM 1
+ union {
+ struct pcs_mr *mr;
+ struct pcs_umem *umem;
+ };
+};
+
+#define PCS_KRPC_HASH_SIZE 1024
+struct pcs_krpc_set {
+ /* Set of krpcs */
+ struct hlist_head ht[PCS_KRPC_HASH_SIZE];
+ struct list_head list;
+ unsigned int nkrpc;
+
+ spinlock_t lock;
+};
+
+enum {
+ PCS_KRPC_STATE_UNCONN,
+ PCS_KRPC_STATE_CONNECTED,
+ PCS_KRPC_STATE_ABORTED,
+ PCS_KRPC_STATE_DESTROYED,
+};
+
+struct pcs_krpc {
+ struct hlist_node hlist;
+ struct list_head link;
+ spinlock_t lock;
+ refcount_t refcnt;
+
+ struct pcs_rpc *rpc;
+
+ struct pcs_krpc_set *krpcs;
+
+ PCS_NODE_ID_T id;
+
+ int state;
+
+ struct list_head pending_queue;
+ struct list_head completion_queue;
+ int nr_completion;
+
+ /** Wait queue head for poll */
+ wait_queue_head_t poll_wait;
+};
+
+/*
+ * Completion message to be received by userspace
+ */
+struct krpc_completion {
+ struct list_head link; /* in krpc->completion_queue */
+
+ u64 xid;
+ int result;
+
+ void *private;
+ int data_len;
+ u8 _data_buf[0];
+};
+
+#define KRPC_MAX_DATA_PAGES 256
+#define NR_KRPC_DATA_CHUNKS_INLINE 4
+struct krpc_req {
+ struct list_head link;
+ spinlock_t lock;
+ struct pcs_krpc *krpc;
+
+#define KRPC_REQ_F_ALIGNMENT PCS_KRPC_MSG_F_ALIGNMENT
+#define KRPC_REQ_F_RESP_BUFF PCS_KRPC_MSG_F_RESP_BUFF /* data buff is for read response */
+#define KRPC_REQ_F_ABORTED 0x10000
+ int flags;
+
+ struct pcs_msg msg;
+
+ char *hdr_buf;
+ struct kvec hdr_kv;
+ struct krpc_chunk hdr_chunk;
+
+ int nr_data_chunks;
+ struct krpc_chunk *data_chunks;
+ struct krpc_chunk inline_data_chunks[NR_KRPC_DATA_CHUNKS_INLINE];
+
+ int data_len;
+ int nr_data_bvecs;
+ struct bio_vec data_bvecs[KRPC_MAX_DATA_PAGES];
+
+ struct krpc_completion completion;
+};
+
+static inline u32 pcs_krpc_msg_size(u32 size, u8 flags)
+{
+ if (flags & PCS_KRPC_MSG_F_ALIGNMENT)
+ size = ALIGN(size, PCS_KRPC_MSG_ALIGNMENT);
+
+ return size;
+}
+
+int pcs_krpc_init(void);
+void pcs_krpc_fini(void);
+
+void pcs_krpcset_init(struct pcs_krpc_set *krpcs);
+void pcs_krpcset_fini(struct pcs_krpc_set *krpcs);
+
+struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id);
+int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
+ PCS_NET_ADDR_T *addr, int cs_flags);
+int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
+ PCS_NET_ADDR_T *addr, int flags);
+int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id);
+int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id);
+
+void pcs_krpc_put(struct pcs_krpc *krpc);
+struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc);
+
+struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
+void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
+void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg);
+#endif
diff --git a/fs/fuse/kio/pcs/pcs_krpc_prot.h b/fs/fuse/kio/pcs/pcs_krpc_prot.h
new file mode 100644
index 000000000000..525f4e04d2cc
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_krpc_prot.h
@@ -0,0 +1,44 @@
+#ifndef _PCS_KRPC_PROT_H_
+#define _PCS_KRPC_PROT_H_
+#include <linux/ioctl.h>
+
+#include "pcs_prot_types.h"
+
+/*Device ioctls: */
+#define PCS_KRPC_IOC_MAGIC 255
+
+/* ioctl cmds supported by the '[pcs_krpc]' anonymous inode */
+#define PCS_KRPC_IOC_SEND_MSG _IO(PCS_KRPC_IOC_MAGIC, 10)
+#define PCS_KRPC_IOC_RECV_MSG _IO(PCS_KRPC_IOC_MAGIC, 11)
+#define PCS_KRPC_IOC_ABORT _IO(PCS_KRPC_IOC_MAGIC, 12)
+
+struct pcs_krpc_buf_desc {
+ u64 addr; /* buf address in userspace. */
+ u32 len; /* size of the buf */
+ u32 mr_id; /* mr id */
+};
+
+#define PCS_KRPC_MSG_ALIGNMENT (512ULL)
+
+#define PCS_KRPC_MSG_F_ALIGNMENT 0x01
+#define PCS_KRPC_MSG_F_RESP_BUFF 0x02
+/*
+ * To avoid copying, a msg is sent to kernel in an array of buffer descriptor.
+ * Each buffer descriptor points to a buf contains a chunk of the msg. And all
+ * chunk buffers are from registered MRs
+ */
+struct pcs_krpc_ioc_sendmsg {
+ u64 xid; /* context id */
+ u32 msg_size; /* total size of the msg */
+ u16 timeout; /* timeout */
+ u8 flags; /* alignment, */
+ u8 nr_data_chunks; /* total number of data chunks */
+ struct pcs_krpc_buf_desc hdr_chunk; /* the buf holding msg header */
+};
+
+struct pcs_krpc_ioc_recvmsg {
+ u64 xid; /* context id */
+ struct pcs_krpc_buf_desc buf; /* for cs congestion notification and rpc keep waiting msg.*/
+};
+
+#endif
diff --git a/fs/fuse/kio/pcs/pcs_mr.c b/fs/fuse/kio/pcs/pcs_mr.c
index ad55b0cbad8b..c2a2c072ba9e 100644
--- a/fs/fuse/kio/pcs/pcs_mr.c
+++ b/fs/fuse/kio/pcs/pcs_mr.c
@@ -126,8 +126,10 @@ int pcs_reg_mr(struct pcs_mr_set *mrs, u64 start, u64 len)
}
umem = pcs_umem_get(start, len);
- if (IS_ERR(umem))
+ if (IS_ERR(umem)) {
+ atomic_dec(&mrs->mr_num);
return PTR_ERR(umem);
+ }
mr = kzalloc(sizeof(*mr), GFP_KERNEL);
if (!mr) {
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index cdf1c9754803..b88e56d2b9a2 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -18,6 +18,8 @@
#include "fuse_ktrace_prot.h"
#include "fuse_stat.h"
#include "../../fuse_i.h"
+#include "pcs_mr.h"
+#include "pcs_krpc.h"
///////////////////////////
@@ -168,7 +170,7 @@ struct pcs_int_request
union {
struct {
struct pcs_map_entry *map;
- //// Temproraly disable flow
+ /* Temproraly disable flow */
struct pcs_flow_node *flow;
u8 cmd;
u8 role;
@@ -253,7 +255,7 @@ struct pcs_int_request
};
};
-// FROM pcs_cluste_core.h
+/* FROM pcs_cluste_core.h */
struct pcs_clnt_config
{
@@ -284,9 +286,15 @@ struct pcs_cluster_core
struct pcs_map_set maps; /* Global map data */
struct pcs_rpc_engine eng; /* RPC engine */
struct workqueue_struct *wq;
-//// struct pcs_ratelimit rlim; /* Rate limiter */
-//// struct pcs_rng rng;
+#if 0
+ struct pcs_ratelimit rlim; /* Rate limiter */
+ struct pcs_rng rng;
+#endif
/* <SKIP */
+
+ struct pcs_mr_set mrs; /* Table of all MRs*/
+ struct pcs_krpc_set krpcs; /* Table of all KRPCs */
+
struct pcs_fuse_stat stat;
struct {
@@ -337,10 +345,20 @@ static inline struct pcs_cluster_core *cc_from_maps(struct pcs_map_set *maps)
return container_of(maps, struct pcs_cluster_core, maps);
}
+static inline struct pcs_cluster_core *cc_from_krpcset(struct pcs_krpc_set *krpcs)
+{
+ return container_of(krpcs, struct pcs_cluster_core, krpcs);
+}
+
+static inline struct pcs_cluster_core *cc_from_krpc(struct pcs_krpc *krpc)
+{
+ return cc_from_krpcset(krpc->krpcs);
+}
+
void pcs_cc_submit(struct pcs_cluster_core *cc, struct pcs_int_request* ireq);
void pcs_cc_requeue(struct pcs_cluster_core *cc, struct list_head * q);
void pcs_cc_update_storage_versions(struct pcs_cluster_core *cc, int version);
-////// FROM pcs_cluster.h
+/* FROM pcs_cluster.h */
static inline void pcs_sreq_attach(struct pcs_int_request * sreq, struct pcs_int_request * parent)
{
sreq->completion_data.parent = parent;
@@ -421,4 +439,6 @@ typedef void (*kio_req_itr)(struct fuse_file *ff, struct fuse_req *req,
void pcs_kio_req_list(struct fuse_conn *fc, kio_req_itr kreq_cb, void *ctx);
extern struct crypto_shash *crc_tfm;
+
+
#endif /* _PCS_REQ_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index 3b763cea9c01..deebc1dddf1b 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -292,7 +292,8 @@ void pcs_rpc_attach_new_ep(struct pcs_rpc * ep, struct pcs_rpc_engine * eng)
ep->peer_flags = 0;
ep->peer_version = ~0U;
ep->conn = NULL;
- ep->private = NULL;
+ ep->clnt_cs = NULL;
+ ep->clnt_krpc = NULL;
INIT_LIST_HEAD(&ep->pending_queue);
INIT_LIST_HEAD(&ep->state_queue);
INIT_LIST_HEAD(&ep->input_queue);
diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
index ef4ab26b9d44..f7df01ef9a49 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.h
+++ b/fs/fuse/kio/pcs/pcs_rpc.h
@@ -153,7 +153,9 @@ struct pcs_rpc
struct hlist_head kill_calendar[RPC_MAX_CALENDAR];
struct llist_node cleanup_node;
- struct pcs_cs * private;
+ struct pcs_cs *clnt_cs;
+ struct pcs_krpc *clnt_krpc;
+ int nr_clnts;
};
struct pcs_rpc_engine
diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.c b/fs/fuse/kio/pcs/pcs_rpc_clnt.c
new file mode 100644
index 000000000000..11b7c3175bf5
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.c
@@ -0,0 +1,188 @@
+/*
+ * fs/fuse/kio/pcs/pcs_rpc_clnt.c
+ *
+ * Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+
+#include <linux/types.h>
+
+#include "pcs_types.h"
+#include "pcs_rpc.h"
+#include "pcs_cluster.h"
+#include "pcs_sock_io.h"
+#include "pcs_sock_conn.h"
+#include "pcs_rdma_conn.h"
+#include "pcs_net_addr.h"
+#include "log.h"
+#include "fuse_ktrace.h"
+#include "pcs_cs.h"
+#include "pcs_krpc.h"
+
+static int clnt_input(struct pcs_rpc *ep, struct pcs_msg *msg)
+{
+ struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer;
+
+ switch (h->type) {
+ case PCS_CS_CONG_NOTIFY:
+ if (ep->clnt_cs)
+ cs_handle_congestion(ep->clnt_cs, h);
+
+ if (ep->clnt_krpc)
+ krpc_handle_congestion(ep, msg);
+ return 0;
+ default:
+ FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported message type %u", h->type);
+ return PCS_ERR_PROTOCOL;
+ }
+}
+
+
+static struct pcs_msg *clnt_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
+{
+ if (h->xid.origin.val & PCS_NODE_ALT_MASK)
+ return cs_get_hdr(ep, h);
+ else
+ return krpc_get_hdr(ep, h);
+}
+
+void clnt_aborting(struct pcs_rpc *ep, int error)
+{
+ pcs_rpc_reset(ep);
+}
+
+static void clnt_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg)
+{
+ struct pcs_rpc_hdr *req_h = (struct pcs_rpc_hdr *)msg_inline_head(req);
+
+ if (req_h->xid.origin.val & PCS_NODE_ALT_MASK)
+ cs_keep_waiting(ep, req, msg);
+ else
+ krpc_keep_waiting(ep, req, msg);
+}
+
+static void clnt_connect(struct pcs_rpc *ep)
+{
+ void (*connect_start)(struct pcs_rpc *);
+
+ if (ep->flags & PCS_RPC_F_LOCAL) {
+ char path[128];
+
+ snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT,
+ (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id));
+
+ if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) {
+ TRACE("Path to local socket is too long: %s", path);
+
+ ep->flags &= ~PCS_RPC_F_LOCAL;
+ goto fail;
+ }
+ memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un));
+ ep->sh.sun.sun_family = AF_UNIX;
+ ep->sh.sa_len = sizeof(struct sockaddr_un);
+ strcpy(ep->sh.sun.sun_path, path);
+ connect_start = pcs_sockconnect_start;
+ } else {
+ /* TODO: print sock addr using pcs_format_netaddr() */
+ if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
+ TRACE("netaddr to sockaddr failed");
+ goto fail;
+ }
+ connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ?
+ pcs_rdmaconnect_start : pcs_sockconnect_start;
+ }
+ ep->state = PCS_RPC_CONNECT;
+ connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
+ return;
+
+fail:
+ pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR);
+ pcs_rpc_reset(ep);
+}
+
+struct pcs_rpc_params clnt_rpc_params = {
+ .alloc_hdr_size = sizeof(struct pcs_rpc_hdr),
+ .max_msg_size = PCS_CS_MSG_MAX_SIZE,
+ .holddown_timeout = HZ,
+ .connect_timeout = 5*HZ,
+ .response_timeout = 30*HZ,
+ .max_conn_retry = 3,
+ .flags = 0,
+};
+
+struct pcs_rpc_ops clnt_rpc_ops = {
+ .demux_request = clnt_input,
+ .get_hdr = clnt_get_hdr,
+ .state_change = clnt_aborting,
+ .connect = clnt_connect,
+ .keep_waiting = clnt_keep_waiting,
+};
+
+
+struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng, PCS_NODE_ID_T *peer_id,
+ PCS_NET_ADDR_T *peer_addr, int flags)
+{
+ struct pcs_rpc *ep = NULL;
+
+ /*
+ * It's not expected this function to be called frequently,
+ * slow version of search is acceptable here.
+ */
+ spin_lock(&eng->lock);
+ hlist_for_each_entry(ep, &eng->unhashed, link) {
+ if (memcmp(&ep->peer_id, peer_id, sizeof(ep->peer_id)) == 0) {
+ pcs_rpc_get(ep);
+ break;
+ }
+ }
+ spin_unlock(&eng->lock);
+
+ if (ep) {
+ mutex_lock(&ep->mutex);
+ if (ep->state != PCS_RPC_DESTROY)
+ goto found;
+
+ mutex_unlock(&ep->mutex);
+ }
+
+ /* create a new pcs_rpc instance if found one had been closed by its last owner */
+ ep = pcs_rpc_alloc_ep();
+ if (!ep)
+ return NULL;
+
+ pcs_rpc_attach_new_ep(ep, eng);
+ pcs_rpc_configure_new_ep(ep, &clnt_rpc_params, &clnt_rpc_ops);
+
+ pcs_rpc_set_peer_id(ep, peer_id, PCS_NODE_ROLE_CS);
+ pcs_rpc_set_address(ep, peer_addr);
+
+ if (flags & CS_FL_LOCAL_SOCK)
+ ep->flags |= PCS_RPC_F_LOCAL;
+ else
+ ep->flags &= ~PCS_RPC_F_LOCAL;
+
+ mutex_lock(&ep->mutex);
+found:
+ ep->nr_clnts++;
+ mutex_unlock(&ep->mutex);
+
+ return ep;
+}
+
+void pcs_rpc_clnt_close(struct pcs_rpc *ep)
+{
+ mutex_lock(&ep->mutex);
+ BUG_ON(ep->flags & PCS_RPC_F_DEAD);
+ BUG_ON(ep->flags & PCS_RPC_F_PASSIVE);
+
+ ep->nr_clnts--;
+ if (!ep->nr_clnts) {
+ /* close the rpc if we're the last rpc client */
+ ep->flags |= PCS_RPC_F_DEAD;
+ rpc_abort(ep, 1, PCS_ERR_NET_ABORT);
+ ep->state = PCS_RPC_DESTROY;
+ }
+ mutex_unlock(&ep->mutex);
+
+ pcs_rpc_put(ep);
+}
diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.h b/fs/fuse/kio/pcs/pcs_rpc_clnt.h
new file mode 100644
index 000000000000..7afe59e9c992
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.h
@@ -0,0 +1,16 @@
+/*
+ * fs/fuse/kio/pcs/pcs_rpc_clnt.h
+ *
+ * Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+
+#ifndef _PCS_RPC_CLNT_H_
+#define _PCS_RPC_CLNT_H_ 1
+#include "pcs_rpc.h"
+
+struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng,
+ PCS_NODE_ID_T *peer_id, PCS_NET_ADDR_T *peer_addr, int flags);
+void pcs_rpc_clnt_close(struct pcs_rpc *ep);
+
+#endif
--
2.39.3 (Apple Git-146)
More information about the Devel
mailing list