[Devel] [PATCH VZ9 v2 2/2] fs/fuse kio: introduce pcs_krpc - export kernel RPC to userspace

Liu Kui kui.liu at virtuozzo.com
Thu May 23 13:58:34 MSK 2024


Resend the patches with comment issues fixed.

Thanks Vasily for pointing out the problem.

On 23/5/24 6:55 pm, Liu Kui wrote:
> Currently there are 2 connections for every RPC, one in userspace,
> one in kernel. This wastes a lot of resources on client hosts in case
> of a huge cluster. It's therefore desirable to eliminate connections
> in userspace by using kernel RPC directly for userspace RPC.
>
> This patch makes the pcs_fuse_kio module provide such functionality for
> userspace to use. Major changes are:
>
> - Introduce a new struct pcs_krpc, which links userspace RPC connection
> directly to corresponding kernel RPC. It also provides execution context
> for forwarding RPC messages from/to kernel RPC.
> - Added several new cmds to the FUSE_IOC_KIO_CALL ioctl for pcs_krpc
> management.
> - Kernel RPC ops is changed to demux msgs to pcs_cs or pcs_krpc
>
> https://pmc.acronis.work/browse/VSTOR-82613
> Signed-off-by: Liu Kui <kui.liu at virtuozzo.com>
> ---
>   fs/fuse/Makefile                   |   6 +-
>   fs/fuse/dev.c                      |  19 +-
>   fs/fuse/fuse_i.h                   |   2 +-
>   fs/fuse/kio/pcs/pcs_cluster_core.c |   4 +
>   fs/fuse/kio/pcs/pcs_cs.c           | 120 +----
>   fs/fuse/kio/pcs/pcs_cs.h           |   5 +-
>   fs/fuse/kio/pcs/pcs_fuse_kdirect.c |  96 +++-
>   fs/fuse/kio/pcs/pcs_ioctl.h        |  29 ++
>   fs/fuse/kio/pcs/pcs_krpc.c         | 794 +++++++++++++++++++++++++++++
>   fs/fuse/kio/pcs/pcs_krpc.h         | 140 +++++
>   fs/fuse/kio/pcs/pcs_krpc_prot.h    |  44 ++
>   fs/fuse/kio/pcs/pcs_mr.c           |   4 +-
>   fs/fuse/kio/pcs/pcs_req.h          |  30 +-
>   fs/fuse/kio/pcs/pcs_rpc.c          |   3 +-
>   fs/fuse/kio/pcs/pcs_rpc.h          |   4 +-
>   fs/fuse/kio/pcs/pcs_rpc_clnt.c     | 188 +++++++
>   fs/fuse/kio/pcs/pcs_rpc_clnt.h     |  16 +
>   17 files changed, 1386 insertions(+), 118 deletions(-)
>   create mode 100644 fs/fuse/kio/pcs/pcs_krpc.c
>   create mode 100644 fs/fuse/kio/pcs/pcs_krpc.h
>   create mode 100644 fs/fuse/kio/pcs/pcs_krpc_prot.h
>   create mode 100644 fs/fuse/kio/pcs/pcs_rpc_clnt.c
>   create mode 100644 fs/fuse/kio/pcs/pcs_rpc_clnt.h
>
> diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
> index 740c805adc2a..18eaa35a234b 100644
> --- a/fs/fuse/Makefile
> +++ b/fs/fuse/Makefile
> @@ -33,7 +33,11 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
>   	kio/pcs/pcs_rdma_rw.o \
>   	kio/pcs/pcs_rdma_conn.o \
>   	kio/pcs/pcs_net_addr.o \
> -	kio/pcs/pcs_cs_accel.o
> +	kio/pcs/pcs_cs_accel.o \
> +	kio/pcs/pcs_rpc_clnt.o \
> +	kio/pcs/pcs_mr.o \
> +	kio/pcs/pcs_krpc.o
> +
>   fuse_kio_pcs_trace-objs := kio/pcs/fuse_kio_pcs_trace.o
>   
>   virtiofs-y := virtio_fs.o
> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
> index e00f5b180e95..bf54adf2bca9 100644
> --- a/fs/fuse/dev.c
> +++ b/fs/fuse/dev.c
> @@ -2657,11 +2657,20 @@ static long fuse_dev_ioctl(struct file *file, unsigned int cmd,
>   
>   		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
>   			return -EFAULT;
> -		op = fuse_kio_get(NULL, req.name);
> -		if (op == NULL)
> -			return -EINVAL;
> -		res = op->ioctl(NULL, NULL, req.cmd, req.data, req.len);
> -		fuse_kio_put(op);
> +
> +		fud = fuse_get_dev(file);
> +		if (fud) {
> +			op = fud->fc->kio.op;
> +			if (op == NULL)
> +				return -EINVAL;
> +			res = op->dev_ioctl(fud->fc, req.cmd, req.data, req.len);
> +		} else {
> +			op = fuse_kio_get(NULL, req.name);
> +			if (op == NULL)
> +				return -EINVAL;
> +			res = op->ioctl(NULL, NULL, req.cmd, req.data, req.len);
> +			fuse_kio_put(op);
> +		}
>   		break;
>   	}
>   	default:
> diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
> index 0c1bd5209dbc..090871a1e356 100644
> --- a/fs/fuse/fuse_i.h
> +++ b/fs/fuse/fuse_i.h
> @@ -649,7 +649,7 @@ struct fuse_kio_ops {
>   	void (*inode_release)(struct fuse_inode *fi);
>   	void (*kill_requests)(struct fuse_conn *fc, struct inode *inode);
>   	int  (*ioctl)(struct file *file, struct inode *inode, unsigned int cmd, unsigned long arg, int len);
> -
> +	int  (*dev_ioctl)(struct fuse_conn *fc, unsigned int cmd, unsigned long arg, int len);
>   };
>   int fuse_register_kio(struct fuse_kio_ops *ops);
>   void fuse_unregister_kio(struct fuse_kio_ops *ops);
> diff --git a/fs/fuse/kio/pcs/pcs_cluster_core.c b/fs/fuse/kio/pcs/pcs_cluster_core.c
> index 8c34c60eb79a..6df2ccee9a1f 100644
> --- a/fs/fuse/kio/pcs/pcs_cluster_core.c
> +++ b/fs/fuse/kio/pcs/pcs_cluster_core.c
> @@ -162,6 +162,8 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
>   	cc->nilbuffer_kv.iov_len = sizeof(cc->nilbuffer);
>   
>   	pcs_csset_init(&cc->css);
> +	pcs_mrset_init(&cc->mrs);
> +	pcs_krpcset_init(&cc->krpcs);
>   
>   	err = pcs_mapset_init(cc);
>   	if (err)
> @@ -210,6 +212,8 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
>   
>   void pcs_cc_fini(struct pcs_cluster_core *cc)
>   {
> +	pcs_krpcset_fini(&cc->krpcs);
> +	pcs_mrset_fini(&cc->mrs);
>   	pcs_csset_fini(&cc->css);
>   	pcs_mapset_fini(&cc->maps);
>   	pcs_rpc_engine_fini(&cc->eng);
> diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
> index 6cc97aa3f9d0..a48c3fbfa5b7 100644
> --- a/fs/fuse/kio/pcs/pcs_cs.c
> +++ b/fs/fuse/kio/pcs/pcs_cs.c
> @@ -25,7 +25,6 @@
>   #include "log.h"
>   #include "fuse_ktrace.h"
>   #include "pcs_net_addr.h"
> -
>   /* Lock order: cs->lock -> css->lock (lru, hash, bl_list) */
>   
>   
> @@ -39,22 +38,9 @@ struct pcs_rpc_params cn_rpc_params = {
>   	.flags			= 0,
>   };
>   
> -static void cs_aborting(struct pcs_rpc *ep, int error);
> -static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
> -static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg);
> -static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
> -static void cs_connect(struct pcs_rpc *ep);
>   static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose);
>   static void pcs_cs_destroy(struct pcs_cs *cs);
>   
> -struct pcs_rpc_ops cn_rpc_ops = {
> -	.demux_request		= cs_input,
> -	.get_hdr		= cs_get_hdr,
> -	.state_change		= cs_aborting,
> -	.keep_waiting		= cs_keep_waiting,
> -	.connect		= cs_connect,
> -};
> -
>   static int pcs_cs_percpu_stat_alloc(struct pcs_cs *cs)
>   {
>   	seqlock_init(&cs->stat.seqlock);
> @@ -124,8 +110,7 @@ static void pcs_cs_percpu_stat_free(struct pcs_cs *cs)
>   	free_percpu(cs->stat.iolat);
>   }
>   
> -struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css,
> -			     struct pcs_cluster_core *cc)
> +struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css)
>   {
>   	struct pcs_cs *cs;
>   
> @@ -153,13 +138,6 @@ struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css,
>   	    kfree(cs);
>   	    return NULL;
>   	}
> -	cs->rpc = pcs_rpc_create(&cc->eng, &cn_rpc_params, &cn_rpc_ops);
> -	if (cs->rpc == NULL) {
> -		pcs_cs_percpu_stat_free(cs);
> -		kfree(cs);
> -		return NULL;
> -	}
> -	cs->rpc->private = cs;
>   	INIT_LIST_HEAD(&cs->map_list);
>   	return cs;
>   }
> @@ -222,6 +200,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
>   		/* If rpc is connected, leave it connected until failure.
>   		 * After current connect fails, reconnect will be done to new address
>   		 */
> +		struct pcs_rpc *rpc = cs->rpc;
>   		if (addr) {
>   			if (addr->type != PCS_ADDRTYPE_NONE) {
>   				if (pcs_netaddr_cmp(&cs->addr, addr)) {
> @@ -231,7 +210,7 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
>   					FUSE_KTRACE(cc_from_csset(csset)->fc,
>   						    "Port change CS" NODE_FMT " seq=%d",
>   						    NODE_ARGS(*id), cs->addr_serno);
> -					pcs_rpc_set_address(cs->rpc, addr);
> +					pcs_rpc_set_address(rpc, addr);
>   
>   					if (!(flags & CS_FL_INACTIVE)) {
>   						pcs_map_notify_addr_change(cs);
> @@ -246,30 +225,31 @@ struct pcs_cs *pcs_cs_find_create(struct pcs_cs_set *csset, PCS_NODE_ID_T *id, P
>   			}
>   		}
>   		if (flags & CS_FL_LOCAL_SOCK)
> -			cs->rpc->flags |= PCS_RPC_F_LOCAL;
> +			rpc->flags |= PCS_RPC_F_LOCAL;
>   		else
> -			cs->rpc->flags &= ~PCS_RPC_F_LOCAL;
> +			rpc->flags &= ~PCS_RPC_F_LOCAL;
>   		return cs;
>   	}
>   	BUG_ON(addr == NULL);
>   
> -	cs = pcs_cs_alloc(csset, cc_from_csset(csset));
> +	cs = pcs_cs_alloc(csset);
>   	if (!cs)
>   		return NULL;
>   
> +	cs->rpc = pcs_rpc_clnt_create(&cc_from_csset(csset)->eng, id, addr, flags);
> +
> +	if (!cs->rpc) {
> +		pcs_cs_percpu_stat_free(cs);
> +		kfree(cs);
> +		return NULL;
> +	}
> +	cs->rpc->clnt_cs = cs;
> +
>   	cs->id = *id;
>   
>   	cs->addr = *addr;
>   	cs->addr_serno = 1;
>   
> -	pcs_rpc_set_peer_id(cs->rpc, id, PCS_NODE_ROLE_CS);
> -	pcs_rpc_set_address(cs->rpc, addr);
> -
> -	if (flags & CS_FL_LOCAL_SOCK)
> -		cs->rpc->flags |= PCS_RPC_F_LOCAL;
> -	else
> -		cs->rpc->flags &= ~PCS_RPC_F_LOCAL;
> -
>   	spin_lock(&cs->lock);
>   	spin_lock(&csset->lock);
>   	if (__lookup_cs(csset, id)) {
> @@ -455,46 +435,7 @@ static void cs_get_read_response_iter(struct pcs_msg *msg, int offset, struct io
>   	}
>   }
>   
> -static void cs_connect(struct pcs_rpc *ep)
> -{
> -	void (*connect_start)(struct pcs_rpc *);
> -
> -	if (ep->flags & PCS_RPC_F_LOCAL) {
> -		char path[128];
> -
> -		snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT,
> -			 (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id));
> -
> -		if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) {
> -			TRACE("Path to local socket is too long: %s", path);
> -
> -			ep->flags &= ~PCS_RPC_F_LOCAL;
> -			goto fail;
> -		}
> -		memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un));
> -		ep->sh.sun.sun_family = AF_UNIX;
> -		ep->sh.sa_len = sizeof(struct sockaddr_un);
> -		strcpy(ep->sh.sun.sun_path, path);
> -		connect_start = pcs_sockconnect_start;
> -	} else {
> -		/* TODO: print sock addr using pcs_format_netaddr() */
> -		if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
> -			TRACE("netaddr to sockaddr failed");
> -			goto fail;
> -		}
> -		connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ?
> -			pcs_rdmaconnect_start : pcs_sockconnect_start;
> -	}
> -	ep->state = PCS_RPC_CONNECT;
> -	connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
> -	return;
> -fail:
> -	pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR);
> -	pcs_rpc_reset(ep);
> -	return;
> -}
> -
> -static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
> +struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
>   {
>   	struct pcs_msg *msg, *resp;
>   	struct pcs_rpc_hdr *req_h;
> @@ -887,7 +828,7 @@ void pcs_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq)
>   	do_cs_submit(cs, ireq);
>   }
>   
> -static void handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
> +void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h)
>   {
>   	struct pcs_cs *who;
>   
> @@ -945,10 +886,10 @@ static int may_reroute(struct pcs_cs_list *csl, PCS_NODE_ID_T cs_id)
>   	return legit;
>   }
>   
> -static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg)
> +void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg)
>   {
>   	struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg_inline_head(msg);
> -	struct pcs_cs *cs = ep->private;
> +	struct pcs_cs *cs = ep->clnt_cs;
>   	struct pcs_cs *who;
>   
>   	/* Some CS reported it cannot complete local IO in time, close congestion window */
> @@ -1003,21 +944,6 @@ static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_
>   
>   }
>   
> -static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg)
> -{
> -	struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer;
> -
> -	switch (h->type) {
> -	case PCS_CS_CONG_NOTIFY:
> -		handle_congestion(ep->private, h);
> -		msg->done(msg);
> -		return 0;
> -	default:
> -		FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported message type %u", h->type);
> -		return PCS_ERR_PROTOCOL;
> -	}
> -}
> -
>   void pcs_cs_notify_error(struct pcs_cluster_core *cc, pcs_error_t *err)
>   {
>   	struct list_head queue;
> @@ -1094,17 +1020,13 @@ static void pcs_cs_destroy(struct pcs_cs *cs)
>   	BUG_ON(cs->csa_ctx);
>   
>   	if (cs->rpc) {
> -		pcs_rpc_close(cs->rpc);
> +		cs->rpc->clnt_cs = NULL;
> +		pcs_rpc_clnt_close(cs->rpc);
>   		cs->rpc = NULL;
>   	}
>   	call_rcu(&cs->rcu, cs_destroy_rcu);
>   }
>   
> -void cs_aborting(struct pcs_rpc *ep, int error)
> -{
> -	pcs_rpc_reset(ep);
> -}
> -
>   /* Latency is difficult value to use for any decisions.
>    * It is sampled at random, we do not know what is happening while
>    * we have no samples. For now we do the following: arriving samples
> diff --git a/fs/fuse/kio/pcs/pcs_cs.h b/fs/fuse/kio/pcs/pcs_cs.h
> index 61be99a54157..62b88f612b54 100644
> --- a/fs/fuse/kio/pcs/pcs_cs.h
> +++ b/fs/fuse/kio/pcs/pcs_cs.h
> @@ -159,8 +159,6 @@ unsigned int cs_get_avg_in_flight(struct pcs_cs *cs);
>   void pcs_csset_init(struct pcs_cs_set *css);
>   void pcs_csset_fini(struct pcs_cs_set *css);
>   
> -struct pcs_cs *pcs_cs_alloc(struct pcs_cs_set *css, struct pcs_cluster_core *cc);
> -
>   void cs_log_io_times(struct pcs_int_request *ireq, struct pcs_msg *resp, unsigned int max_iolat);
>   int pcs_cs_format_io_times(char *buf, int buflen, struct pcs_int_request *ireq, struct pcs_msg *resp);
>   void cs_set_io_times_logger(void (*logger)(struct pcs_int_request *ireq, struct pcs_msg *resp, u32 max_iolat, void *ctx), void *ctx);
> @@ -219,4 +217,7 @@ int pcs_csa_csl_write_submit_single(struct pcs_int_request * ireq, int idx);
>   void pcs_csa_relay_iotimes(struct pcs_int_request * ireq,  struct pcs_cs_iohdr * h, PCS_NODE_ID_T cs_id);
>   void pcs_csa_cs_detach(struct pcs_cs * cs);
>   
> +void cs_handle_congestion(struct pcs_cs *cs, struct pcs_rpc_hdr *h);
> +struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
> +void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
>   #endif /* _PCS_CS_H_ */
> diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
> index 566dcb5f2f4c..a71cb7a9a0b6 100644
> --- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
> +++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
> @@ -37,6 +37,8 @@
>   #include "fuse_ktrace.h"
>   #include "fuse_prometheus.h"
>   #include "pcs_net_addr.h"
> +#include "pcs_mr.h"
> +#include "pcs_krpc.h"
>   
>   unsigned int pcs_loglevel = LOG_TRACE;
>   module_param(pcs_loglevel, uint, 0644);
> @@ -71,6 +73,10 @@ unsigned int rdmaio_queue_depth = 8;
>   module_param(rdmaio_queue_depth, uint, 0644);
>   MODULE_PARM_DESC(rdmaio_queue_depth, "RDMA queue depth");
>   
> +bool pcs_krpc_support = true;
> +module_param(pcs_krpc_support, bool, 0444);
> +MODULE_PARM_DESC(pcs_krpc_support, "krpc support");
> +
>   void (*fuse_printk_plugin)(unsigned long, const char *, ...);
>   EXPORT_SYMBOL(fuse_printk_plugin);
>   
> @@ -189,7 +195,7 @@ static void process_pcs_init_reply(struct fuse_mount *fm, struct fuse_args *args
>   	fuse_ktrace_setup(fc);
>   	fc->ktrace_level = LOG_TRACE;
>   
> -	printk("FUSE: kio_pcs: cl: " CLUSTER_ID_FMT ", clientid: " NODE_FMT "\n",
> +	printk("FUSE: kio_pcs: cl: " CLUSTER_ID_FMT ", clientid: " NODE_FMT,
>   	       CLUSTER_ID_ARGS(info->cluster_id), NODE_ARGS(info->node_id));
>   
>   	spin_lock(&fc->lock);
> @@ -1864,6 +1870,84 @@ static int kpcs_ioctl(struct file *file, struct inode *inode, unsigned int cmd,
>   	return res;
>   }
>   
> +static int kpcs_dev_ioctl(struct fuse_conn *fc, unsigned int cmd, unsigned long arg, int len)
> +{
> +	struct pcs_fuse_cluster *pfc = fc->kio.ctx;
> +	struct pcs_cluster_core *cc = &pfc->cc;
> +	int res;
> +
> +	switch (cmd) {
> +	case PCS_IOC_KRPC_CREATE:
> +	{
> +		struct pcs_ioc_krpc_create req;
> +
> +		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
> +			return -EFAULT;
> +
> +		if (pcs_krpc_lookup(&cc->krpcs, &req.id))
> +			return -EEXIST;
> +
> +		res = pcs_krpc_create(&cc->krpcs, &req.id, &req.addr, req.cs_flags);
> +		break;
> +	}
> +	case PCS_IOC_KRPC_UPDATE_ADDR:
> +	{
> +		struct pcs_ioc_krpc_create req;
> +
> +		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
> +			return -EFAULT;
> +
> +		res = pcs_krpc_update_addr(&cc->krpcs, &req.id, &req.addr, req.cs_flags);
> +		break;
> +	}
> +
> +	case PCS_IOC_KRPC_CONNECT:
> +	{
> +		struct pcs_ioc_krpc_connect req;
> +
> +		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
> +			return -EFAULT;
> +
> +		res = pcs_krpc_connect(&cc->krpcs, &req.id);
> +		break;
> +	}
> +	case PCS_IOC_KRPC_DESTROY:
> +	{
> +		struct pcs_ioc_krpc_destroy req;
> +
> +		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
> +			return -EFAULT;
> +
> +		res = pcs_krpc_destroy(&cc->krpcs, &req.id);
> +		break;
> +	}
> +	case PCS_IOC_REG_MR:
> +	{
> +		struct pcs_ioc_reg_mr req;
> +
> +		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
> +			return -EFAULT;
> +
> +		res = pcs_reg_mr(&cc->mrs, req.start, req.len);
> +		break;
> +	}
> +	case PCS_IOC_DEREG_MR:
> +	{
> +		struct pcs_ioc_dereg_mr req;
> +
> +		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
> +			return -EFAULT;
> +
> +		res = pcs_dereg_mr(&cc->mrs, req.id);
> +		break;
> +	}
> +	default:
> +		res = -ENOIOCTLCMD;
> +		break;
> +	}
> +	return res;
> +}
> +
>   static struct fuse_kio_ops kio_pcs_ops = {
>   	.name		= "pcs",
>   	.owner		= THIS_MODULE,
> @@ -1880,6 +1964,7 @@ static struct fuse_kio_ops kio_pcs_ops = {
>   	.inode_release	= kpcs_inode_release,
>   	.kill_requests	= kpcs_kill_requests,
>   	.ioctl		= kpcs_ioctl,
> +	.dev_ioctl  = kpcs_dev_ioctl,
>   };
>   
>   
> @@ -1920,10 +2005,13 @@ static int __init kpcs_mod_init(void)
>   	if (pcs_csa_init())
>   		goto free_cleanup_wq;
>   
> +	if (pcs_krpc_init())
> +		goto free_csa;
> +
>   	fast_path_version = PCS_FAST_PATH_VERSION.full;
>   
>   	if (fuse_register_kio(&kio_pcs_ops))
> -		goto free_csa;
> +		goto free_krpc;
>   
>   	/* Clone relay_file_operations to set ownership */
>   	ktrace_file_operations = relay_file_operations;
> @@ -1935,10 +2023,13 @@ static int __init kpcs_mod_init(void)
>   	if (IS_ERR(crc_tfm))
>   		crc_tfm = NULL;
>   
> +
>   	printk("%s fuse_c:%p ireq_c:%p pcs_wq:%p\n", __FUNCTION__,
>   	       pcs_fuse_req_cachep, pcs_ireq_cachep, pcs_wq);
>   
>   	return 0;
> +free_krpc:
> +	pcs_krpc_fini();
>   free_csa:
>   	pcs_csa_fini();
>   free_cleanup_wq:
> @@ -1968,6 +2059,7 @@ static void __exit kpcs_mod_exit(void)
>   	kmem_cache_destroy(pcs_ireq_cachep);
>   	kmem_cache_destroy(pcs_fuse_req_cachep);
>   	pcs_csa_fini();
> +	pcs_krpc_fini();
>   }
>   
>   module_init(kpcs_mod_init);
> diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
> index 8e55be02c654..3c000373745b 100644
> --- a/fs/fuse/kio/pcs/pcs_ioctl.h
> +++ b/fs/fuse/kio/pcs/pcs_ioctl.h
> @@ -123,4 +123,33 @@ struct pcs_csa_setmap
>   #define PCS_CSA_IOC_SETMAP	_IOR('V',38, struct pcs_csa_setmap)
>   #define PCS_KIO_CALL_REG	_IOR('V',39, struct fuse_pcs_ioc_register)
>   
> +struct pcs_ioc_reg_mr {
> +	u64	start;
> +	u64	len;
> +};
> +#define PCS_IOC_REG_MR  _IOR('V', 40, struct pcs_ioc_reg_mr)
> +
> +struct pcs_ioc_dereg_mr {
> +	u32 id;
> +};
> +#define PCS_IOC_DEREG_MR  _IOR('V', 41, struct pcs_ioc_dereg_mr)
> +
> +struct pcs_ioc_krpc_create {
> +	PCS_NODE_ID_T	id;
> +	PCS_NET_ADDR_T	addr;
> +	int				cs_flags;
> +};
> +#define PCS_IOC_KRPC_CREATE	_IOR('V', 42, struct pcs_ioc_krpc_create)
> +#define PCS_IOC_KRPC_UPDATE_ADDR _IOR('V', 43, struct pcs_ioc_krpc_create)
> +
> +struct pcs_ioc_krpc_connect {
> +	PCS_NODE_ID_T	id;
> +};
> +#define PCS_IOC_KRPC_CONNECT _IOR('V', 44, struct pcs_ioc_krpc_connect)
> +
> +struct pcs_ioc_krpc_destroy {
> +	PCS_NODE_ID_T	id;
> +};
> +#define PCS_IOC_KRPC_DESTROY _IOR('V', 45, struct pcs_ioc_krpc_destroy)
> +
>   #endif /* _PCS_IOCTL_H_ */
> diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
> new file mode 100644
> index 000000000000..0e49dc227dca
> --- /dev/null
> +++ b/fs/fuse/kio/pcs/pcs_krpc.c
> @@ -0,0 +1,794 @@
> +/*
> + *  fs/fuse/kio/pcs/pcs_krpc.c
> + *
> + *  Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved.
> + *
> + */
> +
> +#include <linux/types.h>
> +#include <linux/list.h>
> +#include <linux/rbtree.h>
> +#include <linux/refcount.h>
> +#include <linux/file.h>
> +#include <linux/anon_inodes.h>
> +
> +#include "pcs_types.h"
> +#include "pcs_cluster.h"
> +#include "pcs_cs_prot.h"
> +#include "pcs_req.h"
> +#include "pcs_krpc.h"
> +
> +struct kmem_cache *krpc_req_cachep;
> +
> +static int krpc_completion_post(struct pcs_krpc *krpc, struct krpc_completion *comp)
> +__releases(krpc->lock)
> +{
> +	/* post complete only when connected */
> +	if (krpc->state != PCS_KRPC_STATE_CONNECTED) {
> +		spin_unlock(&krpc->lock);
> +		return -1;
> +	}
> +	list_add_tail(&comp->link, &krpc->completion_queue);
> +	krpc->nr_completion++;
> +	spin_unlock(&krpc->lock);
> +
> +	wake_up_poll(&krpc->poll_wait, EPOLLIN);
> +	return 0;
> +}
> +
> +static struct krpc_req *krpc_req_alloc(void)
> +{
> +	struct krpc_req *kreq;
> +
> +	kreq = kmem_cache_alloc(krpc_req_cachep, GFP_NOIO);
> +	if (!kreq)
> +		return NULL;
> +
> +	spin_lock_init(&kreq->lock);
> +	kreq->flags = 0;
> +	memset(&kreq->completion, 0, sizeof(struct krpc_completion));
> +
> +	return kreq;
> +}
> +
> +static void krpc_req_free(struct krpc_req *kreq)
> +{
> +	kmem_cache_free(krpc_req_cachep, kreq);
> +}
> +
> +static void krpc_completion_free(struct krpc_completion *comp)
> +{
> +	if (comp->private)
> +		krpc_req_free((struct krpc_req *)comp->private);
> +	else
> +		kfree(comp);
> +}
> +
> +static void krpc_req_complete(struct krpc_req *kreq, int error)
> +{
> +	struct krpc_completion *comp = &kreq->completion;
> +	struct pcs_krpc *krpc = kreq->krpc;
> +	int i;
> +
> +	BUG_ON(!comp->xid);
> +
> +	comp->result = error;
> +
> +	/* kunmap(kreq->hdr_buf); */
> +	pcs_mr_put(kreq->hdr_chunk.mr);
> +
> +	for (i = 0; i < kreq->nr_data_chunks; i++) {
> +		struct krpc_chunk *chunk;
> +
> +		chunk = &kreq->data_chunks[i];
> +		if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
> +			pcs_umem_release(chunk->umem);
> +		else
> +			pcs_mr_put(chunk->mr);
> +	}
> +
> +	if (kreq->data_chunks != &kreq->inline_data_chunks[0])
> +		kfree(kreq->data_chunks);
> +
> +	spin_lock(&krpc->lock);
> +	list_del(&kreq->link);
> +
> +	if (kreq->flags & KRPC_REQ_F_ABORTED) {
> +		krpc_req_free(kreq);
> +		spin_unlock(&krpc->lock);
> +	} else if (krpc_completion_post(krpc, comp))
> +		krpc_req_free(kreq);
> +
> +	pcs_krpc_put(krpc);
> +}
> +
> +static void krpc_msg_get_response_iter(struct pcs_msg *msg, int offset,
> +			struct iov_iter *it, unsigned int direction)
> +{
> +	struct pcs_msg *req = msg->private;
> +	struct krpc_req *kreq = req->private2;
> +
> +	/* No data payload*/
> +	if (!(kreq->flags & KRPC_REQ_F_RESP_BUFF)) {
> +		/* No data payload */
> +		BUG_ON(msg->size > PAGE_SIZE);
> +
> +		kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf;
> +		kreq->hdr_kv.iov_len = msg->size;
> +
> +		iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, msg->size);
> +		iov_iter_advance(it, offset);
> +	} else {
> +	/* With Data payload */
> +		int hdr_size = sizeof(struct pcs_cs_iohdr);
> +
> +		if (offset < hdr_size) {
> +			kreq->hdr_kv.iov_base = (void *)kreq->hdr_buf;
> +			kreq->hdr_kv.iov_len = hdr_size;
> +			iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, hdr_size);
> +			iov_iter_advance(it, offset);
> +		} else {
> +			//offset -= (unsigned int) sizeof(struct pcs_cs_iohdr);
> +			BUG_ON((offset - hdr_size) > kreq->data_len);
> +			BUG_ON(msg->size - hdr_size > kreq->data_len);
> +
> +			iov_iter_bvec(it, direction, &kreq->data_bvecs[0],
> +						kreq->nr_data_bvecs, kreq->data_len);
> +			iov_iter_truncate(it, msg->size - hdr_size);
> +			iov_iter_advance(it, offset - hdr_size);
> +		}
> +	}
> +}
> +
> +struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
> +{
> +	struct pcs_msg *msg, *resp;
> +	struct pcs_rpc_hdr *req_h;
> +	struct krpc_req *kreq;
> +
> +	if (!RPC_IS_RESPONSE(h->type))
> +		return NULL;
> +
> +	msg = pcs_rpc_lookup_xid(ep, &h->xid);
> +	if (msg == NULL)
> +		return NULL;
> +
> +	req_h = (struct pcs_rpc_hdr *)msg_inline_head(msg);
> +	if (req_h->type != (h->type & ~PCS_RPC_DIRECTION))
> +		return NULL;
> +
> +	kreq = msg->private2;
> +
> +	resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr));
> +	if (!resp)
> +		return NULL;
> +
> +	memcpy(resp->_inline_buffer, h, sizeof(struct pcs_rpc_hdr));
> +	memcpy(kreq->hdr_buf, h, sizeof(struct pcs_rpc_hdr));
> +	resp->size = h->len;
> +	resp->private = msg;
> +	resp->get_iter = krpc_msg_get_response_iter;
> +	resp->done = rpc_work_input;
> +	pcs_msg_del_calendar(msg);
> +
> +	return resp;
> +}
> +
> +/*
> + * All request type message from cs are forwarded to userspace directly. The PCS KRPC
> + * layer cannot process these request on its own.
> + */
> +static void krpc_handle_request(struct pcs_krpc *krpc, struct pcs_msg *msg)
> +{
> +	struct krpc_completion *comp;
> +
> +	if (krpc->state != PCS_KRPC_STATE_CONNECTED)
> +		return;
> +
> +	comp = kzalloc(sizeof(*comp) + msg->size, GFP_NOIO);
> +	if (!comp) {
> +		/* Error ?*/
> +		return;
> +	}
> +
> +	memcpy(comp->_data_buf, msg->_inline_buffer, msg->size);
> +	comp->data_len = msg->size;
> +
> +	spin_lock(&krpc->lock);
> +	if (krpc_completion_post(krpc, comp))
> +		kfree(comp);
> +}
> +
> +/*
> + * Let userspace to handle cs_congestion_notify request.
> + */
> +void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg)
> +{
> +	krpc_handle_request(ep->clnt_krpc, msg);
> +}
> +
> +/*
> + * Let userspace to handle the keep_waiting request.
> + */
> +void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg)
> +{
> +	krpc_handle_request(ep->clnt_krpc, msg);
> +}
> +
> +unsigned int pcs_krpc_hash(PCS_NODE_ID_T *id)
> +{
> +	return *(unsigned int *)id % PCS_KRPC_HASH_SIZE;
> +}
> +
> +static void krpc_msg_get_data(struct pcs_msg *msg, int offset,
> +		struct iov_iter *it, unsigned int direction)
> +{
> +	struct krpc_req *kreq = msg->private2;
> +	int hdr_size = pcs_krpc_msg_size(kreq->hdr_chunk.len, kreq->flags);
> +	int data_size = pcs_krpc_msg_size(kreq->data_len, kreq->flags);
> +	struct pcs_krpc *krpc = kreq->krpc;
> +
> +	if (offset < kreq->hdr_chunk.len) {
> +		iov_iter_kvec(it, direction, &kreq->hdr_kv, 1, kreq->hdr_chunk.len);
> +		iov_iter_advance(it, offset);
> +		return;
> +	}
> +
> +	if (offset < hdr_size) {
> +		iov_iter_kvec(it, direction, &(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv,
> +						 1, hdr_size - offset);
> +		return;
> +	}
> +
> +	if (offset < hdr_size + kreq->data_len) {
> +		iov_iter_bvec(it, direction, &kreq->data_bvecs[0],
> +				kreq->nr_data_bvecs, kreq->data_len);
> +		iov_iter_advance(it, offset - hdr_size);
> +		return;
> +	}
> +
> +	BUG_ON(offset >= hdr_size + data_size);
> +
> +	iov_iter_kvec(it, direction, &(cc_from_krpcset(krpc->krpcs))->nilbuffer_kv,
> +					1, (hdr_size + data_size - offset));
> +}
> +
> +static void pcs_krpc_response_done(struct pcs_msg *msg)
> +{
> +	struct krpc_req *kreq = msg->private2;
> +
> +	if (msg->rpc) {
> +		pcs_rpc_put(msg->rpc);
> +		msg->rpc = NULL;
> +	}
> +
> +	krpc_req_complete(kreq, msg->error.value);
> +}
> +
> +static void pcs_krpc_msg_sent(struct pcs_msg *msg)
> +{
> +	msg->done = pcs_krpc_response_done;
> +	if (pcs_if_error(&msg->error)) {
> +		msg->done(msg);
> +		return;
> +	}
> +	pcs_rpc_sent(msg);
> +}
> +
> +static int pcs_krpc_ioctl_recv_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_recvmsg *iocmsg)
> +{
> +	struct krpc_completion *comp;
> +	int res = 0;
> +
> +	spin_lock(&krpc->lock);
> +	if (list_empty(&krpc->completion_queue)) {
> +		spin_unlock(&krpc->lock);
> +		return 0;
> +	}
> +
> +	comp = list_first_entry(&krpc->completion_queue, struct krpc_completion, link);
> +	list_del(&comp->link);
> +	krpc->nr_completion--;
> +	spin_unlock(&krpc->lock);
> +
> +	if (comp->result) {
> +		res -=  comp->result;
> +		goto out;
> +	}
> +
> +	res = 1;
> +	iocmsg->xid = comp->xid;
> +	if (comp->xid == 0) {
> +		BUG_ON(!comp->data_len);
> +		BUG_ON(iocmsg->buf.len < comp->data_len);
> +		if (copy_to_user((void __user *)iocmsg->buf.addr, comp->_data_buf, comp->data_len))
> +			res = -EFAULT;
> +	}
> +
> +out:
> +	krpc_completion_free(comp);
> +	return res;
> +}
> +
> +static int pcs_krpc_ioctl_send_msg(struct pcs_krpc *krpc, struct pcs_krpc_ioc_sendmsg *iocmsg)
> +{
> +	struct krpc_req *kreq;
> +	struct pcs_msg *msg;
> +	struct pcs_krpc_buf_desc *chunk_bd;
> +	struct krpc_chunk *chunk;
> +	int res, i;
> +	struct bio_vec *bvec;
> +
> +	kreq = krpc_req_alloc();
> +	if (!kreq)
> +		return -ENOMEM;
> +
> +	if (iocmsg->nr_data_chunks > NR_KRPC_DATA_CHUNKS_INLINE) {
> +		kreq->data_chunks = kzalloc(iocmsg->nr_data_chunks, GFP_NOIO);
> +		if (!kreq->data_chunks) {
> +			res = -ENOMEM;
> +			goto err_free_kreq;
> +		}
> +	} else
> +		kreq->data_chunks = &kreq->inline_data_chunks[0];
> +
> +	/*
> +	 * Header buff is exactly one page, used for staging message header. Will be used for
> +	 * receiving header of response message as well.
> +	 */
> +	BUG_ON(iocmsg->hdr_chunk.addr & (PAGE_SIZE - 1));
> +
> +	chunk_bd = &iocmsg->hdr_chunk;
> +	chunk = &kreq->hdr_chunk;
> +
> +	chunk->addr = chunk_bd->addr;
> +	chunk->len = chunk_bd->len;
> +	chunk->type = KRPC_CHUNK_TYPE_MR;
> +
> +	chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, iocmsg->hdr_chunk.mr_id);
> +	BUG_ON(!chunk->mr);
> +
> +	kreq->hdr_buf = (char *) kmap(pcs_umem_page(chunk->mr->umem, chunk->addr));
> +	kreq->hdr_kv.iov_base = kreq->hdr_buf;
> +	kreq->hdr_kv.iov_len = chunk->len;
> +
> +	/* data chunk buf descriptors are placed at end of header buf, grow backwards */
> +	kreq->data_len = 0;
> +	kreq->nr_data_chunks  = 0;
> +	kreq->nr_data_bvecs  = 0;
> +
> +	chunk_bd = (struct pcs_krpc_buf_desc *)(kreq->hdr_buf + PAGE_SIZE) - 1;
> +	chunk = kreq->data_chunks;
> +	bvec = &kreq->data_bvecs[0];
> +
> +	for (i = 0; i < iocmsg->nr_data_chunks; i++) {
> +		struct page *page;
> +		u64 addr;
> +		int offset, len;
> +
> +		chunk->addr = chunk_bd->addr;
> +		chunk->len = chunk_bd->len;
> +		if (chunk_bd->mr_id) {
> +			chunk->mr = pcs_mr_get(&cc_from_krpc(krpc)->mrs, chunk_bd->mr_id);
> +			chunk->type = KRPC_CHUNK_TYPE_MR;
> +			if (!chunk->mr) {
> +				res = -ENXIO;
> +				goto err_free_data_chunk;
> +			}
> +		} else {
> +			/* unregistered memory buf */
> +			chunk->umem = pcs_umem_get(chunk->addr, chunk->len);
> +			if (IS_ERR(chunk->umem)) {
> +				res = PTR_ERR(chunk->umem);
> +				goto err_free_data_chunk;
> +			}
> +
> +			chunk->type = KRPC_CHUNK_TYPE_UMEM;
> +		}
> +
> +		/* build the bvecs for the data chunk*/
> +		addr = chunk->addr;
> +		len = chunk->len;
> +
> +		while (len) {
> +			/* data bvec array overflow? */
> +			BUG_ON(kreq->nr_data_bvecs >= KRPC_MAX_DATA_PAGES);
> +
> +			if (chunk->type == KRPC_CHUNK_TYPE_MR)
> +				page = pcs_umem_page(chunk->mr->umem, addr);
> +			else
> +				page = pcs_umem_page(chunk->umem, addr);
> +
> +			BUG_ON(!page);
> +
> +			offset = addr & (PAGE_SIZE - 1);
> +
> +			bvec->bv_page = page;
> +			bvec->bv_offset =  offset;
> +
> +			bvec->bv_len = len < (PAGE_SIZE - offset) ? len : (PAGE_SIZE - offset);
> +
> +			addr += bvec->bv_len;
> +			len -= bvec->bv_len;
> +			bvec++;
> +			kreq->nr_data_bvecs++;
> +		}
> +		BUG_ON(len != 0);
> +
> +		kreq->data_len += chunk->len;
> +		chunk++;
> +		chunk_bd--;
> +		kreq->nr_data_chunks++;
> +	}
> +
> +	kreq->completion.xid = iocmsg->xid;
> +	kreq->completion.private = kreq;
> +
> +	kreq->flags = iocmsg->flags;
> +
> +	msg = &kreq->msg;
> +	msg->private = krpc;
> +	msg->private2 = kreq;
> +
> +	INIT_HLIST_NODE(&msg->kill_link);
> +	pcs_clear_error(&msg->error);
> +
> +	msg->size = iocmsg->msg_size;
> +	msg->timeout = iocmsg->timeout;
> +
> +	msg->rpc = NULL;
> +	msg->done = pcs_krpc_msg_sent;
> +	msg->get_iter = krpc_msg_get_data;
> +
> +	spin_lock(&krpc->lock);
> +	kreq->krpc = pcs_krpc_get(krpc);
> +	list_add_tail(&kreq->link, &krpc->pending_queue);
> +	spin_unlock(&krpc->lock);
> +	/* DTRACE to be added */
> +	pcs_rpc_queue(krpc->rpc, msg);
> +
> +	return 0;
> +
> +err_free_data_chunk:
> +	for (i = 0; i < kreq->nr_data_chunks; i++) {
> +		chunk = &kreq->data_chunks[i];
> +		if (chunk->type == KRPC_CHUNK_TYPE_UMEM)
> +			pcs_umem_release(chunk->umem);
> +		else
> +			pcs_mr_put(chunk->mr);
> +	}
> +	pcs_mr_put(kreq->hdr_chunk.mr);
> +
> +err_free_kreq:
> +	if (kreq->data_chunks != &kreq->inline_data_chunks[0])
> +		kfree(kreq->data_chunks);
> +	krpc_req_free(kreq);
> +	return res;
> +}
> +
> +static int pcs_krpc_abort(struct pcs_krpc *krpc)
> +{
> +	struct list_head dispose_list;
> +	struct krpc_req *kreq;
> +	struct krpc_completion *comp;
> +
> +	INIT_LIST_HEAD(&dispose_list);
> +
> +	spin_lock(&krpc->lock);
> +
> +	if (krpc->state != PCS_KRPC_STATE_CONNECTED) {
> +		spin_unlock(&krpc->lock);
> +		return 0;
> +	}
> +
> +	krpc->state = PCS_KRPC_STATE_ABORTED;
> +
> +	/* abort incompleted requests */
> +	list_splice_tail_init(&krpc->pending_queue, &dispose_list);
> +	while (!list_empty(&dispose_list)) {
> +		kreq = list_first_entry(&dispose_list, struct krpc_req, link);
> +		kreq->flags |= KRPC_REQ_F_ABORTED;
> +	}
> +
> +	list_splice_tail_init(&krpc->completion_queue, &dispose_list);
> +	krpc->nr_completion = 0;
> +
> +	/* dispose all unprocessed completions */
> +	while (!list_empty(&dispose_list)) {
> +		comp = list_first_entry(&dispose_list, struct krpc_completion, link);
> +		list_del(&comp->link);
> +		krpc_completion_free(comp);
> +	}
> +
> +	spin_unlock(&krpc->lock);
> +
> +	return 0;
> +}
> +
> +static long pcs_krpc_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
> +{
> +	struct pcs_krpc *krpc = file->private_data;
> +	int res = 0;
> +
> +	switch (cmd) {
> +	case PCS_KRPC_IOC_SEND_MSG: {
> +		struct pcs_krpc_ioc_sendmsg req;
> +
> +		if (copy_from_user(&req, (void __user *)arg, sizeof(req)))
> +			return -EFAULT;
> +
> +		res = pcs_krpc_ioctl_send_msg(krpc, &req);
> +		break;
> +	}
> +	case PCS_KRPC_IOC_RECV_MSG: {
> +		struct pcs_krpc_ioc_recvmsg  req;
> +
> +		res = pcs_krpc_ioctl_recv_msg(krpc, &req);
> +
> +		if (copy_to_user((void __user *)arg, &req, sizeof(req)))
> +			return -EFAULT;
> +		break;
> +	}
> +	case PCS_KRPC_IOC_ABORT:
> +		res = pcs_krpc_abort(krpc);
> +		break;
> +	default:
> +		res = -ENOTTY;
> +		break;
> +	}
> +
> +	return res;
> +}
> +
> +static __poll_t pcs_krpc_poll(struct file *file, poll_table *wait)
> +{
> +	struct pcs_krpc *krpc = file->private_data;
> +	__poll_t pollflags = 0;
> +
> +	poll_wait(file, &krpc->poll_wait, wait);
> +
> +	spin_lock(&krpc->lock);
> +	/* EPOLLERR? */
> +	if (krpc->state == PCS_KRPC_STATE_ABORTED)
> +		pollflags |= EPOLLERR;
> +	else if (krpc->state == PCS_KRPC_STATE_CONNECTED) {
> +		pollflags |= EPOLLOUT;
> +		if (!list_empty(&krpc->completion_queue))
> +			pollflags |= EPOLLIN;
> +	}
> +
> +	spin_unlock(&krpc->lock);
> +
> +	return pollflags;
> +}
> +
> +/*
> + * Reset to initial state -- PCS_KRPC_STATE_UNCONN
> + */
> +static int pcs_krpc_release(struct inode *inode, struct file *file)
> +{
> +	struct pcs_krpc *krpc = file->private_data;
> +
> +	/* Just abort all pending requests and disconnect from the krpc */
> +	pcs_krpc_abort(krpc);
> +
> +	spin_lock(&krpc->lock);
> +	if (krpc->state == PCS_KRPC_STATE_ABORTED)
> +		krpc->state = PCS_KRPC_STATE_UNCONN;
> +	spin_unlock(&krpc->lock);
> +
> +	return 0;
> +}
> +
> +static const struct file_operations pcs_krpc_fops = {
> +	.owner = THIS_MODULE,
> +	.release = pcs_krpc_release,
> +	.poll	= pcs_krpc_poll,
> +	.unlocked_ioctl = pcs_krpc_ioctl,
> +	.llseek		= no_llseek,
> +};
> +
> +struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
> +{
> +	struct pcs_krpc *krpc;
> +
> +	hlist_for_each_entry(krpc, &krpcs->ht[pcs_krpc_hash(id)], hlist) {
> +		if (memcmp(&krpc->id, id, sizeof(krpc->id)) == 0)
> +			return krpc;
> +	}
> +	return NULL;
> +}
> +
> +struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc)
> +{
> +	refcount_inc(&krpc->refcnt);
> +	return krpc;
> +}
> +
> +void pcs_krpc_put(struct pcs_krpc *krpc)
> +{
> +	if (refcount_dec_and_test(&krpc->refcnt))
> +		kfree(krpc);
> +}
> +
> +/*
> + * Create a new pcs_krpc
> + */
> +int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
> +		PCS_NET_ADDR_T *addr, int cs_flags)
> +{
> +	struct pcs_krpc *krpc;
> +
> +	krpc = kzalloc(sizeof(struct pcs_krpc), GFP_NOIO);
> +	if (!krpc)
> +		return -ENOMEM;
> +
> +	INIT_HLIST_NODE(&krpc->hlist);
> +	INIT_LIST_HEAD(&krpc->link);
> +	spin_lock_init(&krpc->lock);
> +	refcount_set(&krpc->refcnt, 1);
> +	INIT_LIST_HEAD(&krpc->pending_queue);
> +	INIT_LIST_HEAD(&krpc->completion_queue);
> +	init_waitqueue_head(&krpc->poll_wait);
> +
> +	krpc->krpcs = krpcs;
> +	krpc->id = *id;
> +	krpc->state = PCS_KRPC_STATE_UNCONN;
> +
> +	krpc->rpc = pcs_rpc_clnt_create(&cc_from_krpcset(krpcs)->eng, id, addr, cs_flags);
> +	if (!krpc->rpc) {
> +		kfree(krpc);
> +		return -ENOMEM;
> +	}
> +
> +	krpc->rpc->clnt_krpc = krpc;
> +
> +	spin_lock(&krpc->lock);
> +	spin_lock(&krpcs->lock);
> +	hlist_add_head(&krpc->hlist, &krpcs->ht[pcs_krpc_hash(&krpc->id)]);
> +	list_add_tail(&krpc->link, &krpcs->list);
> +	krpcs->nkrpc++;
> +	spin_unlock(&krpcs->lock);
> +	spin_unlock(&krpc->lock);
> +
> +	return 0;
> +}
> +
> +int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
> +		PCS_NET_ADDR_T *addr, int flags)
> +{
> +	struct pcs_krpc *krpc;
> +
> +	krpc = pcs_krpc_lookup(krpcs, id);
> +	if (!krpc)
> +		return -ENXIO;
> +
> +	pcs_rpc_set_address(krpc->rpc, addr);
> +
> +	if (flags & CS_FL_LOCAL_SOCK)
> +		krpc->rpc->flags |= PCS_RPC_F_LOCAL;
> +	else
> +		krpc->rpc->flags &= ~PCS_RPC_F_LOCAL;
> +
> +	return 0;
> +}
> +/*
> + * Connect to a pcs_krpc, return a valid fd on success.
> + */
> +int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
> +{
> +	struct pcs_krpc *krpc;
> +	int fd;
> +	struct file *file;
> +
> +	krpc = pcs_krpc_lookup(krpcs, id);
> +	if (!krpc)
> +		return -ENXIO;
> +
> +	WARN_ON(krpc->state != PCS_KRPC_STATE_UNCONN);
> +	if (krpc->state != PCS_KRPC_STATE_UNCONN)
> +		return -EPERM;
> +
> +	fd = get_unused_fd_flags(O_CLOEXEC);
> +	if (fd < 0)
> +		return fd;
> +
> +	file = anon_inode_getfile("[pcs_krpc]", &pcs_krpc_fops, krpc, 0);
> +	if (IS_ERR(file)) {
> +		put_unused_fd(fd);
> +		fd = PTR_ERR(file);
> +		return fd;
> +	}
> +
> +	fd_install(fd, file);
> +
> +	/*
> +	 * the krpc should always be connected regardless state of
> +	 * underlying RPC
> +	 */
> +	krpc->state = PCS_KRPC_STATE_CONNECTED;
> +	return fd;
> +}
> +
> +static void __pcs_krpc_destroy(struct pcs_krpc *krpc)
> +{
> +	spin_lock(&krpc->lock);
> +
> +	krpc->state = PCS_KRPC_STATE_DESTROYED;
> +
> +	/*Remove from krpc set*/
> +	spin_lock(&krpc->krpcs->lock);
> +	hlist_del(&krpc->hlist);
> +	list_del(&krpc->link);
> +	krpc->krpcs->nkrpc--;
> +	spin_unlock(&krpc->krpcs->lock);
> +
> +	spin_unlock(&krpc->lock);
> +
> +	if (krpc->rpc) {
> +		krpc->rpc->clnt_krpc = NULL;
> +		pcs_rpc_clnt_close(krpc->rpc);
> +		krpc->rpc = NULL;
> +	}
> +	pcs_krpc_put(krpc);
> +}
> +
> +int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id)
> +{
> +	struct pcs_krpc *krpc;
> +
> +	krpc = pcs_krpc_lookup(krpcs, id);
> +	BUG_ON(!krpc);
> +	BUG_ON(krpc->state != PCS_KRPC_STATE_UNCONN);
> +
> +	__pcs_krpc_destroy(krpc);
> +	return 0;
> +}
> +
> +void pcs_krpcset_init(struct pcs_krpc_set *krpcs)
> +{
> +	unsigned int i;
> +
> +	for (i = 0; i < PCS_KRPC_HASH_SIZE; i++)
> +		INIT_HLIST_HEAD(&krpcs->ht[i]);
> +
> +	INIT_LIST_HEAD(&krpcs->list);
> +	krpcs->nkrpc = 0;
> +
> +	spin_lock_init(&krpcs->lock);
> +}
> +
> +void pcs_krpcset_fini(struct pcs_krpc_set *krpcs)
> +{
> +	spin_lock(&krpcs->lock);
> +	while (!list_empty(&krpcs->list)) {
> +		struct pcs_krpc *krpc;
> +
> +		krpc = list_first_entry(&krpcs->list, struct pcs_krpc, link);
> +		spin_unlock(&krpcs->lock);
> +		pcs_krpc_abort(krpc);
> +		__pcs_krpc_destroy(krpc);
> +		spin_lock(&krpcs->lock);
> +	}
> +	spin_unlock(&krpcs->lock);
> +
> +	BUG_ON(!list_empty(&krpcs->list));
> +	BUG_ON(krpcs->nkrpc != 0);
> +}
> +
> +int __init pcs_krpc_init(void)
> +{
> +	krpc_req_cachep = kmem_cache_create("pcs_krpc_req",
> +							sizeof(struct krpc_req), 0,
> +							SLAB_RECLAIM_ACCOUNT|SLAB_ACCOUNT, NULL);
> +
> +	if (!krpc_req_cachep)
> +		return -ENOMEM;
> +
> +	return 0;
> +}
> +
> +void pcs_krpc_fini(void)
> +{
> +	kmem_cache_destroy(krpc_req_cachep);
> +}
> diff --git a/fs/fuse/kio/pcs/pcs_krpc.h b/fs/fuse/kio/pcs/pcs_krpc.h
> new file mode 100644
> index 000000000000..6a257ba80208
> --- /dev/null
> +++ b/fs/fuse/kio/pcs/pcs_krpc.h
> @@ -0,0 +1,140 @@
> +/*
> + *  fs/fuse/kio/pcs/pcs_krpc.h
> + *
> + *  Copyright (c) 2018-2021 Virtuozzo International GmbH. All rights reserved.
> + *
> + */
> +#ifndef _PCS_KRPC_H_
> +#define _PCS_KRPC_H_ 1
> +#include <linux/types.h>
> +#include <linux/bvec.h>
> +
> +#include "pcs_prot_types.h"
> +#include "pcs_perfcounters.h"
> +#include "pcs_rpc_clnt.h"
> +#include "pcs_krpc_prot.h"
> +#include "pcs_mr.h"
> +
> +struct krpc_chunk {
> +	u64		addr;
> +	u32		len;
> +	u32		type;
> +#define KRPC_CHUNK_TYPE_MR		0
> +#define KRPC_CHUNK_TYPE_UMEM	1
> +	union {
> +		struct pcs_mr *mr;
> +		struct pcs_umem *umem;
> +	};
> +};
> +
> +#define PCS_KRPC_HASH_SIZE	1024
> +struct pcs_krpc_set {
> +	/* Set of krpcs */
> +	struct hlist_head		ht[PCS_KRPC_HASH_SIZE];
> +	struct list_head		list;
> +	unsigned int			nkrpc;
> +
> +	spinlock_t				lock;
> +};
> +
> +enum {
> +	PCS_KRPC_STATE_UNCONN,
> +	PCS_KRPC_STATE_CONNECTED,
> +	PCS_KRPC_STATE_ABORTED,
> +	PCS_KRPC_STATE_DESTROYED,
> +};
> +
> +struct pcs_krpc {
> +	struct hlist_node		hlist;
> +	struct list_head		link;
> +	spinlock_t				lock;
> +	refcount_t				refcnt;
> +
> +	struct pcs_rpc			*rpc;
> +
> +	struct pcs_krpc_set		*krpcs;
> +
> +	PCS_NODE_ID_T			id;
> +
> +	int state;
> +
> +	struct list_head		pending_queue;
> +	struct list_head		completion_queue;
> +	int nr_completion;
> +
> +	/** Wait queue head for poll */
> +	wait_queue_head_t		poll_wait;
> +};
> +
> +/*
> + * Completion message to be received by userspace
> + */
> +struct krpc_completion {
> +	struct list_head	link;		/* in krpc->completion_queue */
> +
> +	u64			xid;
> +	int			result;
> +
> +	void		*private;
> +	int			data_len;
> +	u8			_data_buf[0];
> +};
> +
> +#define KRPC_MAX_DATA_PAGES			256
> +#define NR_KRPC_DATA_CHUNKS_INLINE	4
> +struct krpc_req {
> +	struct list_head	link;
> +	spinlock_t			lock;
> +	struct pcs_krpc		*krpc;
> +
> +#define KRPC_REQ_F_ALIGNMENT	PCS_KRPC_MSG_F_ALIGNMENT
> +#define KRPC_REQ_F_RESP_BUFF	PCS_KRPC_MSG_F_RESP_BUFF	/* data buff is for read response */
> +#define KRPC_REQ_F_ABORTED		0x10000
> +	int flags;
> +
> +	struct pcs_msg		msg;
> +
> +	char				*hdr_buf;
> +	struct kvec			hdr_kv;
> +	struct krpc_chunk	hdr_chunk;
> +
> +	int					nr_data_chunks;
> +	struct krpc_chunk	*data_chunks;
> +	struct krpc_chunk	inline_data_chunks[NR_KRPC_DATA_CHUNKS_INLINE];
> +
> +	int data_len;
> +	int nr_data_bvecs;
> +	struct bio_vec data_bvecs[KRPC_MAX_DATA_PAGES];
> +
> +	struct krpc_completion completion;
> +};
> +
> +static inline u32 pcs_krpc_msg_size(u32 size, u8 flags)
> +{
> +	if (flags & PCS_KRPC_MSG_F_ALIGNMENT)
> +		size = ALIGN(size, PCS_KRPC_MSG_ALIGNMENT);
> +
> +	return size;
> +}
> +
> +int pcs_krpc_init(void);
> +void pcs_krpc_fini(void);
> +
> +void pcs_krpcset_init(struct pcs_krpc_set *krpcs);
> +void pcs_krpcset_fini(struct pcs_krpc_set *krpcs);
> +
> +struct pcs_krpc *pcs_krpc_lookup(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id);
> +int pcs_krpc_create(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
> +		PCS_NET_ADDR_T *addr, int cs_flags);
> +int pcs_krpc_update_addr(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id,
> +		PCS_NET_ADDR_T *addr, int flags);
> +int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id);
> +int pcs_krpc_destroy(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id);
> +
> +void pcs_krpc_put(struct pcs_krpc *krpc);
> +struct pcs_krpc *pcs_krpc_get(struct pcs_krpc *krpc);
> +
> +struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
> +void krpc_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
> +void krpc_handle_congestion(struct pcs_rpc *ep, struct pcs_msg *msg);
> +#endif
> diff --git a/fs/fuse/kio/pcs/pcs_krpc_prot.h b/fs/fuse/kio/pcs/pcs_krpc_prot.h
> new file mode 100644
> index 000000000000..525f4e04d2cc
> --- /dev/null
> +++ b/fs/fuse/kio/pcs/pcs_krpc_prot.h
> @@ -0,0 +1,44 @@
> +#ifndef _PCS_KRPC_PROT_H_
> +#define _PCS_KRPC_PROT_H_
> +#include <linux/ioctl.h>
> +
> +#include "pcs_prot_types.h"
> +
> +/*Device ioctls: */
> +#define PCS_KRPC_IOC_MAGIC  255
> +
> +/* ioctl cmds supported by the '[pcs_krpc]' anonymous inode */
> +#define PCS_KRPC_IOC_SEND_MSG		_IO(PCS_KRPC_IOC_MAGIC, 10)
> +#define PCS_KRPC_IOC_RECV_MSG		_IO(PCS_KRPC_IOC_MAGIC, 11)
> +#define PCS_KRPC_IOC_ABORT			_IO(PCS_KRPC_IOC_MAGIC, 12)
> +
> +struct pcs_krpc_buf_desc {
> +	u64  addr;		/* buf address in userspace. */
> +	u32  len;		/* size of the buf */
> +	u32  mr_id;		/* mr id */
> +};
> +
> +#define PCS_KRPC_MSG_ALIGNMENT		(512ULL)
> +
> +#define PCS_KRPC_MSG_F_ALIGNMENT	0x01
> +#define PCS_KRPC_MSG_F_RESP_BUFF	0x02
> +/*
> + * To avoid copying, a msg is sent to kernel in an array of buffer descriptor.
> + * Each buffer descriptor points to a buf contains a chunk of the msg. And all
> + * chunk buffers are from registered MRs
> + */
> +struct pcs_krpc_ioc_sendmsg {
> +	u64		xid;				/* context id */
> +	u32		msg_size;			/* total size of the msg */
> +	u16		timeout;			/* timeout */
> +	u8		flags;				/* alignment,  */
> +	u8		nr_data_chunks;		/* total number of data chunks */
> +	struct pcs_krpc_buf_desc hdr_chunk;	/* the buf holding msg header */
> +};
> +
> +struct pcs_krpc_ioc_recvmsg {
> +	u64		xid;	/* context id */
> +	struct pcs_krpc_buf_desc buf;	/* for cs congestion notification and rpc keep waiting msg.*/
> +};
> +
> +#endif
> diff --git a/fs/fuse/kio/pcs/pcs_mr.c b/fs/fuse/kio/pcs/pcs_mr.c
> index ad55b0cbad8b..c2a2c072ba9e 100644
> --- a/fs/fuse/kio/pcs/pcs_mr.c
> +++ b/fs/fuse/kio/pcs/pcs_mr.c
> @@ -126,8 +126,10 @@ int pcs_reg_mr(struct pcs_mr_set *mrs, u64 start, u64 len)
>   	}
>   
>   	umem = pcs_umem_get(start, len);
> -	if (IS_ERR(umem))
> +	if (IS_ERR(umem)) {
> +		atomic_dec(&mrs->mr_num);
>   		return PTR_ERR(umem);
> +	}
>   
>   	mr = kzalloc(sizeof(*mr), GFP_KERNEL);
>   	if (!mr) {
> diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
> index cdf1c9754803..b88e56d2b9a2 100644
> --- a/fs/fuse/kio/pcs/pcs_req.h
> +++ b/fs/fuse/kio/pcs/pcs_req.h
> @@ -18,6 +18,8 @@
>   #include "fuse_ktrace_prot.h"
>   #include "fuse_stat.h"
>   #include "../../fuse_i.h"
> +#include "pcs_mr.h"
> +#include "pcs_krpc.h"
>   
>   ///////////////////////////
>   
> @@ -168,7 +170,7 @@ struct pcs_int_request
>   	union {
>   		struct {
>   			struct pcs_map_entry	*map;
> -			//// Temproraly disable flow
> +			/* Temproraly disable flow */
>   			struct pcs_flow_node	*flow;
>   			u8			cmd;
>   			u8			role;
> @@ -253,7 +255,7 @@ struct pcs_int_request
>   	};
>   };
>   
> -// FROM pcs_cluste_core.h
> +/* FROM pcs_cluste_core.h */
>   
>   struct pcs_clnt_config
>   {
> @@ -284,9 +286,15 @@ struct pcs_cluster_core
>   	struct pcs_map_set	maps;		/* Global map data */
>   	struct pcs_rpc_engine	eng;		/* RPC engine */
>   	struct workqueue_struct *wq;
> -////	struct pcs_ratelimit	rlim;		/* Rate limiter */
> -////	struct pcs_rng		rng;
> +#if 0
> +	struct pcs_ratelimit	rlim;		/* Rate limiter */
> +	struct pcs_rng		rng;
> +#endif
>   	/* <SKIP */
> +
> +	struct pcs_mr_set	mrs;		/* Table of all MRs*/
> +	struct pcs_krpc_set	krpcs;		/* Table of all KRPCs */
> +
>   	struct pcs_fuse_stat	stat;
>   
>   	struct {
> @@ -337,10 +345,20 @@ static inline struct pcs_cluster_core *cc_from_maps(struct pcs_map_set *maps)
>   	return container_of(maps, struct pcs_cluster_core, maps);
>   }
>   
> +static inline struct pcs_cluster_core *cc_from_krpcset(struct pcs_krpc_set *krpcs)
> +{
> +	return container_of(krpcs, struct pcs_cluster_core, krpcs);
> +}
> +
> +static inline struct pcs_cluster_core *cc_from_krpc(struct pcs_krpc *krpc)
> +{
> +	return cc_from_krpcset(krpc->krpcs);
> +}
> +
>   void pcs_cc_submit(struct pcs_cluster_core *cc, struct pcs_int_request* ireq);
>   void pcs_cc_requeue(struct pcs_cluster_core *cc, struct list_head * q);
>   void pcs_cc_update_storage_versions(struct pcs_cluster_core *cc, int version);
> -////// FROM pcs_cluster.h
> +/* FROM pcs_cluster.h */
>   static inline void pcs_sreq_attach(struct pcs_int_request * sreq, struct pcs_int_request * parent)
>   {
>   	sreq->completion_data.parent = parent;
> @@ -421,4 +439,6 @@ typedef void (*kio_req_itr)(struct fuse_file *ff, struct fuse_req *req,
>   void pcs_kio_req_list(struct fuse_conn *fc, kio_req_itr kreq_cb, void *ctx);
>   extern struct crypto_shash *crc_tfm;
>   
> +
> +
>   #endif /* _PCS_REQ_H_ */
> diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
> index 3b763cea9c01..deebc1dddf1b 100644
> --- a/fs/fuse/kio/pcs/pcs_rpc.c
> +++ b/fs/fuse/kio/pcs/pcs_rpc.c
> @@ -292,7 +292,8 @@ void pcs_rpc_attach_new_ep(struct pcs_rpc * ep, struct pcs_rpc_engine * eng)
>   	ep->peer_flags = 0;
>   	ep->peer_version = ~0U;
>   	ep->conn = NULL;
> -	ep->private = NULL;
> +	ep->clnt_cs = NULL;
> +	ep->clnt_krpc = NULL;
>   	INIT_LIST_HEAD(&ep->pending_queue);
>   	INIT_LIST_HEAD(&ep->state_queue);
>   	INIT_LIST_HEAD(&ep->input_queue);
> diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
> index ef4ab26b9d44..f7df01ef9a49 100644
> --- a/fs/fuse/kio/pcs/pcs_rpc.h
> +++ b/fs/fuse/kio/pcs/pcs_rpc.h
> @@ -153,7 +153,9 @@ struct pcs_rpc
>   	struct hlist_head	kill_calendar[RPC_MAX_CALENDAR];
>   	struct llist_node	cleanup_node;
>   
> -	struct pcs_cs *		private;
> +	struct pcs_cs	*clnt_cs;
> +	struct pcs_krpc	*clnt_krpc;
> +	int nr_clnts;
>   };
>   
>   struct pcs_rpc_engine
> diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.c b/fs/fuse/kio/pcs/pcs_rpc_clnt.c
> new file mode 100644
> index 000000000000..11b7c3175bf5
> --- /dev/null
> +++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.c
> @@ -0,0 +1,188 @@
> +/*
> + *  fs/fuse/kio/pcs/pcs_rpc_clnt.c
> + *
> + *  Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved.
> + *
> + */
> +
> +#include <linux/types.h>
> +
> +#include "pcs_types.h"
> +#include "pcs_rpc.h"
> +#include "pcs_cluster.h"
> +#include "pcs_sock_io.h"
> +#include "pcs_sock_conn.h"
> +#include "pcs_rdma_conn.h"
> +#include "pcs_net_addr.h"
> +#include "log.h"
> +#include "fuse_ktrace.h"
> +#include "pcs_cs.h"
> +#include "pcs_krpc.h"
> +
> +static int clnt_input(struct pcs_rpc *ep, struct pcs_msg *msg)
> +{
> +	struct pcs_rpc_hdr *h = (struct pcs_rpc_hdr *)msg->_inline_buffer;
> +
> +	switch (h->type) {
> +	case PCS_CS_CONG_NOTIFY:
> +		if (ep->clnt_cs)
> +			cs_handle_congestion(ep->clnt_cs, h);
> +
> +		if (ep->clnt_krpc)
> +			krpc_handle_congestion(ep, msg);
> +		return 0;
> +	default:
> +		FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Unsupported message type %u", h->type);
> +		return PCS_ERR_PROTOCOL;
> +	}
> +}
> +
> +
> +static struct pcs_msg *clnt_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h)
> +{
> +	if (h->xid.origin.val & PCS_NODE_ALT_MASK)
> +		return cs_get_hdr(ep, h);
> +	else
> +		return krpc_get_hdr(ep, h);
> +}
> +
> +void clnt_aborting(struct pcs_rpc *ep, int error)
> +{
> +	pcs_rpc_reset(ep);
> +}
> +
> +static void clnt_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg)
> +{
> +	struct pcs_rpc_hdr *req_h = (struct pcs_rpc_hdr *)msg_inline_head(req);
> +
> +	if (req_h->xid.origin.val & PCS_NODE_ALT_MASK)
> +		cs_keep_waiting(ep, req, msg);
> +	else
> +		krpc_keep_waiting(ep, req, msg);
> +}
> +
> +static void clnt_connect(struct pcs_rpc *ep)
> +{
> +	void (*connect_start)(struct pcs_rpc *);
> +
> +	if (ep->flags & PCS_RPC_F_LOCAL) {
> +		char path[128];
> +
> +		snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT,
> +			 (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id));
> +
> +		if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) {
> +			TRACE("Path to local socket is too long: %s", path);
> +
> +			ep->flags &= ~PCS_RPC_F_LOCAL;
> +			goto fail;
> +		}
> +		memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un));
> +		ep->sh.sun.sun_family = AF_UNIX;
> +		ep->sh.sa_len = sizeof(struct sockaddr_un);
> +		strcpy(ep->sh.sun.sun_path, path);
> +		connect_start = pcs_sockconnect_start;
> +	} else {
> +		/* TODO: print sock addr using pcs_format_netaddr() */
> +		if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
> +			TRACE("netaddr to sockaddr failed");
> +			goto fail;
> +		}
> +		connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ?
> +			pcs_rdmaconnect_start : pcs_sockconnect_start;
> +	}
> +	ep->state = PCS_RPC_CONNECT;
> +	connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
> +	return;
> +
> +fail:
> +	pcs_rpc_report_error(ep, PCS_RPC_ERR_CONNECT_ERROR);
> +	pcs_rpc_reset(ep);
> +}
> +
> +struct pcs_rpc_params clnt_rpc_params = {
> +	.alloc_hdr_size		= sizeof(struct pcs_rpc_hdr),
> +	.max_msg_size		= PCS_CS_MSG_MAX_SIZE,
> +	.holddown_timeout	= HZ,
> +	.connect_timeout	= 5*HZ,
> +	.response_timeout	= 30*HZ,
> +	.max_conn_retry		= 3,
> +	.flags			= 0,
> +};
> +
> +struct pcs_rpc_ops clnt_rpc_ops = {
> +	.demux_request		= clnt_input,
> +	.get_hdr			= clnt_get_hdr,
> +	.state_change		= clnt_aborting,
> +	.connect			= clnt_connect,
> +	.keep_waiting		= clnt_keep_waiting,
> +};
> +
> +
> +struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng, PCS_NODE_ID_T *peer_id,
> +				PCS_NET_ADDR_T *peer_addr, int flags)
> +{
> +	struct pcs_rpc *ep = NULL;
> +
> +	/*
> +	 * It's not expected this function to be called frequently,
> +	 * slow version of search is acceptable here.
> +	 */
> +	spin_lock(&eng->lock);
> +	hlist_for_each_entry(ep, &eng->unhashed, link) {
> +		if (memcmp(&ep->peer_id, peer_id, sizeof(ep->peer_id)) == 0) {
> +			pcs_rpc_get(ep);
> +			break;
> +		}
> +	}
> +	spin_unlock(&eng->lock);
> +
> +	if (ep) {
> +		mutex_lock(&ep->mutex);
> +		if (ep->state != PCS_RPC_DESTROY)
> +			goto found;
> +
> +		mutex_unlock(&ep->mutex);
> +	}
> +
> +	/* create a new pcs_rpc instance if found one had been closed by its last owner */
> +	ep = pcs_rpc_alloc_ep();
> +	if (!ep)
> +		return NULL;
> +
> +	pcs_rpc_attach_new_ep(ep, eng);
> +	pcs_rpc_configure_new_ep(ep, &clnt_rpc_params, &clnt_rpc_ops);
> +
> +	pcs_rpc_set_peer_id(ep, peer_id, PCS_NODE_ROLE_CS);
> +	pcs_rpc_set_address(ep, peer_addr);
> +
> +	if (flags & CS_FL_LOCAL_SOCK)
> +		ep->flags |= PCS_RPC_F_LOCAL;
> +	else
> +		ep->flags &= ~PCS_RPC_F_LOCAL;
> +
> +	mutex_lock(&ep->mutex);
> +found:
> +	ep->nr_clnts++;
> +	mutex_unlock(&ep->mutex);
> +
> +	return ep;
> +}
> +
> +void pcs_rpc_clnt_close(struct pcs_rpc *ep)
> +{
> +	mutex_lock(&ep->mutex);
> +	BUG_ON(ep->flags & PCS_RPC_F_DEAD);
> +	BUG_ON(ep->flags & PCS_RPC_F_PASSIVE);
> +
> +	ep->nr_clnts--;
> +	if (!ep->nr_clnts) {
> +		/* close the rpc if we're the last rpc client */
> +		ep->flags |= PCS_RPC_F_DEAD;
> +		rpc_abort(ep, 1, PCS_ERR_NET_ABORT);
> +		ep->state = PCS_RPC_DESTROY;
> +	}
> +	mutex_unlock(&ep->mutex);
> +
> +	pcs_rpc_put(ep);
> +}
> diff --git a/fs/fuse/kio/pcs/pcs_rpc_clnt.h b/fs/fuse/kio/pcs/pcs_rpc_clnt.h
> new file mode 100644
> index 000000000000..7afe59e9c992
> --- /dev/null
> +++ b/fs/fuse/kio/pcs/pcs_rpc_clnt.h
> @@ -0,0 +1,16 @@
> +/*
> + *  fs/fuse/kio/pcs/pcs_rpc_clnt.h
> + *
> + *  Copyright (c) 2018-2024 Virtuozzo International GmbH. All rights reserved.
> + *
> + */
> +
> +#ifndef _PCS_RPC_CLNT_H_
> +#define _PCS_RPC_CLNT_H_ 1
> +#include "pcs_rpc.h"
> +
> +struct pcs_rpc *pcs_rpc_clnt_create(struct pcs_rpc_engine *eng,
> +		PCS_NODE_ID_T *peer_id, PCS_NET_ADDR_T *peer_addr, int flags);
> +void pcs_rpc_clnt_close(struct pcs_rpc *ep);
> +
> +#endif


More information about the Devel mailing list