[Devel] [PATCH RHEL8 COMMIT] fs/fuse kio: implement support RDMA transport

Konstantin Khorenko khorenko at virtuozzo.com
Thu Oct 15 10:37:35 MSK 2020


The commit is pushed to "branch-rh8-4.18.0-193.6.3.vz8.4.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh8-4.18.0-193.6.3.vz8.4.13
------>
commit 05f08453aa9ec285106433ed263d1c288d9430f8
Author: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
Date:   Thu Oct 15 10:37:35 2020 +0300

    fs/fuse kio: implement support RDMA transport
    
    This patch adds support of RDMA transport for RPC.
    
    For RDMA read requests is possible to enable/disable usage of prealloceted dma memory region
    through sysfs '/sys/module/fuse_kio_pcs/parameters/rdmaio_use_dma_mr_for_rdma_rw'
    
    This patch provides the ability to enable maping a list of msg pages
    to rdma memory region through sysfs '/sys/module/fuse_kio_pcs/parameters/rdmaio_use_map_for_mr',
    instead of copy pages to/from preallocated and registred region.
    
    Also possible to configure RDMA io queue depth through sysfs
    '/sys/module/fuse_kio_pcs/parameters/rdmaio_queue_depth'
    
    https://pmc.acronis.com/browse/VSTOR-4310
    
    Signed-off-by: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
    
    +++
    fs/fuse kio: resolve name conflicts with ib_core
    
    Rename ib_mr_pool_{init,destroy,get,set} to avoid conflicts
    with names from include/rdma/mr_pool.h
    
    mFixes: 867c0e15fddf (fs/fuse kio: implement support RDMA transport)
    
    Signed-off-by: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
    
    +++
    fs/fuse kio: add dependency from infiniband to Kconfig
    
    In previous patches RDMA transport support was added but
    dependency from INFINIBAND is missed.
    
    mFixes: 867c0e15fddf (fs/fuse kio: implement support RDMA transport)
    
    Signed-off-by: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
    Signed-off-by: Ildar Ismagilov <Ildar.Ismagilov at acronis.com>
---
 fs/fuse/Kconfig                    |    2 +-
 fs/fuse/Makefile                   |    5 +-
 fs/fuse/kio/pcs/pcs_cs.c           |   15 +-
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |   31 +-
 fs/fuse/kio/pcs/pcs_rdma_conn.c    |  184 ++++
 fs/fuse/kio/pcs/pcs_rdma_conn.h    |    6 +
 fs/fuse/kio/pcs/pcs_rdma_io.c      | 1720 ++++++++++++++++++++++++++++++++++++
 fs/fuse/kio/pcs/pcs_rdma_io.h      |  114 +++
 fs/fuse/kio/pcs/pcs_rdma_prot.h    |  119 +++
 fs/fuse/kio/pcs/pcs_rdma_rw.c      |  815 +++++++++++++++++
 fs/fuse/kio/pcs/pcs_rdma_rw.h      |  184 ++++
 11 files changed, 3181 insertions(+), 14 deletions(-)

diff --git a/fs/fuse/Kconfig b/fs/fuse/Kconfig
index f4d23ceabedc..732fb3aa6638 100644
--- a/fs/fuse/Kconfig
+++ b/fs/fuse/Kconfig
@@ -56,7 +56,7 @@ config FUSE_KIO_NULLIO
 
 config FUSE_KIO_PCS
 	tristate "Enable kdirect PCS io engine"
-	depends on FUSE_FS
+	depends on FUSE_FS && INFINIBAND && INFINIBAND_ADDR_TRANS
 	help
 	  This FUSE extension allows to forward io requests directly to PCS
 
diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index 8ac6c4cfe114..ccc0855a3ea8 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -28,7 +28,10 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
 	kio/pcs/fuse_io.o \
 	kio/pcs/fuse_stat.o \
 	kio/pcs/pcs_sock_conn.o \
-	kio/pcs/pcs_auth.o
+	kio/pcs/pcs_auth.o \
+	kio/pcs/pcs_rdma_io.o \
+	kio/pcs/pcs_rdma_rw.o \
+	kio/pcs/pcs_rdma_conn.o
 
 fuse-objs := dev.o dir.o file.o inode.o control.o xattr.o acl.o readdir.o
 virtiofs-y += virtio_fs.o
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 57b07df00f55..1529d06fdc8f 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -7,13 +7,13 @@
 #include "pcs_types.h"
 #include "pcs_sock_io.h"
 #include "pcs_rpc.h"
-#include "pcs_sock_io.h"
 #include "pcs_req.h"
 #include "pcs_map.h"
 #include "pcs_cs.h"
 #include "pcs_cs_prot.h"
 #include "pcs_cluster.h"
 #include "pcs_sock_conn.h"
+#include "pcs_rdma_conn.h"
 #include "pcs_ioctl.h"
 #include "log.h"
 #include "fuse_ktrace.h"
@@ -458,17 +458,12 @@ static void cs_connect(struct pcs_rpc *ep)
 		connect_start = pcs_sockconnect_start;
 	} else {
 		/* TODO: print sock addr using pcs_format_netaddr() */
-		if (ep->addr.type != PCS_ADDRTYPE_RDMA) {
-			if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
-				TRACE("netaddr to sockaddr failed");
-				goto fail;
-			}
-			connect_start = pcs_sockconnect_start;
-		} else {
-			WARN_ON_ONCE(1);
-			/* TODO: rdma connect init */
+		if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
+			TRACE("netaddr to sockaddr failed");
 			goto fail;
 		}
+		connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ?
+			pcs_rdmaconnect_start : pcs_sockconnect_start;
 	}
 	ep->state = PCS_RPC_CONNECT;
 	connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 9a40434ba841..3205288da404 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -39,8 +39,30 @@ unsigned int debugfs_tracing = DEBUGFS_TRACE;
 module_param(debugfs_tracing, uint, 0644);
 MODULE_PARM_DESC(debugfs_tracing, "Enable/Disbale debugfs tracing");
 
+bool rdmaio_use_map_for_mr = false;
+module_param(rdmaio_use_map_for_mr, bool, 0644);
+MODULE_PARM_DESC(rdmaio_use_map_for_mr, "Enable/Disbale usage of map for RDMA MRs");
+
+bool rdmaio_use_dma_mr_for_rdma_rw = true;
+module_param(rdmaio_use_dma_mr_for_rdma_rw, bool, 0644);
+MODULE_PARM_DESC(rdmaio_use_dma_mr_for_rdma_rw,
+		 "Enable/Disbale usage of DMA memory region for RDMA read/write requests");
+
+unsigned int rdmaio_cq_count = 0;
+module_param(rdmaio_cq_count, uint, 0644);
+MODULE_PARM_DESC(rdmaio_cq_count, "RDMA CQ count");
+
+unsigned int rdmaio_cq_period = 0;
+module_param(rdmaio_cq_period, uint, 0644);
+MODULE_PARM_DESC(rdmaio_cq_period, "RDMA CQ period in microsecond");
+
+unsigned int rdmaio_queue_depth = 8;
+module_param(rdmaio_queue_depth, uint, 0644);
+MODULE_PARM_DESC(rdmaio_queue_depth, "RDMA queue depth");
+
 #ifdef CONFIG_DEBUG_KERNEL
-static int set_sockio_fail_percent(const char *val, const struct kernel_param *kp)
+
+static int set_io_fail_percent(const char *val, const struct kernel_param *kp)
 {
 	unsigned *p;
 	int rv;
@@ -57,10 +79,15 @@ static int set_sockio_fail_percent(const char *val, const struct kernel_param *k
 }
 
 u32 sockio_fail_percent;
-module_param_call(sockio_fail_percent, set_sockio_fail_percent,
+module_param_call(sockio_fail_percent, set_io_fail_percent,
 		  param_get_uint, &sockio_fail_percent, 0644);
 __MODULE_PARM_TYPE(sockio_fail_percent, "uint");
 MODULE_PARM_DESC(sockio_fail_percent, "Sock io failing rate in percents");
+
+bool rdmaio_io_failing = false;
+module_param(rdmaio_io_failing, bool, 0644);
+MODULE_PARM_DESC(rdmaio_io_failing, "Enable/Disbale RDMA io failing");
+
 #endif
 
 static int fuse_ktrace_setup(struct fuse_conn * fc);
diff --git a/fs/fuse/kio/pcs/pcs_rdma_conn.c b/fs/fuse/kio/pcs/pcs_rdma_conn.c
new file mode 100644
index 000000000000..ff6225afbbfa
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_conn.c
@@ -0,0 +1,184 @@
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <linux/types.h>
+
+#include <rdma/rdma_cm.h>
+
+#include "pcs_types.h"
+#include "pcs_rdma_io.h"
+#include "pcs_rpc.h"
+#include "pcs_cluster.h"
+#include "pcs_auth.h"
+#include "log.h"
+#include "fuse_ktrace.h"
+
+#define RESOLVE_TIMEOUT_MS 5000
+
+enum {
+    RDMA_MAX_RESP_RES = 0xFF,
+    RDMA_MAX_INIT_DEPTH = 0xFF
+};
+
+struct pcs_rdmaconnect
+{
+	struct pcs_rpc *ep;
+
+	struct rdma_cm_id *cmid;
+	struct pcs_rdmaio *rio;
+
+	struct pcs_rdma_id id;
+
+	enum rdma_cm_event_type cm_event;
+	struct completion cm_done;
+};
+
+extern unsigned int rdmaio_queue_depth;
+
+static void
+conn_param_init(struct rdma_conn_param *cp, struct pcs_rdmaio_conn_req *cr)
+{
+	memset(cp, 0, sizeof(*cp));
+
+	if (cr) {
+		cp->private_data     = cr;
+		cp->private_data_len = sizeof(*cr);
+	}
+
+	/* these two guys are about RDMA reads: see man rdma_connect(3) */
+	cp->responder_resources = RDMA_MAX_RESP_RES;
+	cp->initiator_depth     = RDMA_MAX_INIT_DEPTH;
+
+	cp->flow_control        = 1; /* does not matter */
+	cp->retry_count         = 0; /* # retransmissions when no ACK received */
+	cp->rnr_retry_count     = 0; /* # RNR retransmissions */
+}
+
+static int pcs_rdma_cm_event_handler(struct rdma_cm_id *cmid,
+				     struct rdma_cm_event *event)
+{
+	struct pcs_rdma_id *id = cmid->context;
+	struct pcs_rdmaconnect *rc = container_of(id, struct pcs_rdmaconnect, id);
+	struct rdma_conn_param conn_param;
+
+	TRACE("event: %d, status: %d\n", event->event, event->status);
+
+	rc->cm_event = event->event;
+	switch (event->event) {
+		case RDMA_CM_EVENT_ADDR_RESOLVED:
+			if (rdma_resolve_route(cmid, RESOLVE_TIMEOUT_MS))
+				complete(&rc->cm_done);
+			break;
+		case RDMA_CM_EVENT_ROUTE_RESOLVED:
+			rc->rio = pcs_rdma_create(sizeof(struct pcs_rpc_hdr),
+						  rc->cmid, rdmaio_queue_depth, rc->ep);
+			if (!rc->rio) {
+				complete(&rc->cm_done);
+				break;
+			}
+			rc->cmid = NULL;
+
+			conn_param_init(&conn_param, &rc->rio->conn_req);
+			if (rdma_connect(cmid, &conn_param)) {
+				TRACE("rdma_connect failed: rio: 0x%p\n", rc->rio);
+				complete(&rc->cm_done);
+			}
+			break;
+		case RDMA_CM_EVENT_ESTABLISHED:
+			cmid->context = &rc->rio->id;
+			complete(&rc->cm_done);
+			break;
+		case RDMA_CM_EVENT_REJECTED:
+			TRACE("pcs_rdma_cm_event_handler reject: %s, rio: 0x%p\n",
+			      rdma_reject_msg(cmid, event->status), rc->rio);
+			complete(&rc->cm_done);
+			break;
+		default:
+			complete(&rc->cm_done);
+	}
+
+	return 0;
+}
+
+static int pcs_rdma_event_handler(struct rdma_cm_id *cmid,
+		struct rdma_cm_event *event)
+{
+	struct pcs_rdma_id *id = cmid->context;
+	return id->event_handler(cmid, event);
+}
+
+void pcs_rdmaconnect_start(struct pcs_rpc *ep)
+{
+	struct pcs_rdmaconnect rc = {};
+	struct sockaddr *sa = &ep->sh.sa;
+	int ret;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	TRACE("rdma connection start\n");
+
+	rc.ep = ep;
+	rc.id.event_handler = pcs_rdma_cm_event_handler;
+	init_completion(&rc.cm_done);
+
+	rc.cmid = rdma_create_id(&init_net, pcs_rdma_event_handler, &rc.id,
+				 RDMA_PS_TCP, IB_QPT_RC);
+	if (IS_ERR(rc.cmid)) {
+		TRACE("rdma_create_id failed: %ld\n", PTR_ERR(rc.cmid));
+		goto fail;
+	}
+
+	ret = rdma_resolve_addr(rc.cmid, NULL, sa, RESOLVE_TIMEOUT_MS);
+	if (ret) {
+		TRACE("rdma_resolve_addr failed: %d\n", ret);
+		goto fail_cm;
+	}
+
+	wait_for_completion(&rc.cm_done);
+	if (rc.cm_event != RDMA_CM_EVENT_ESTABLISHED) {
+		TRACE("rdma connection failed: %d\n", rc.cm_event);
+		goto fail_cm;
+	}
+
+	TRACE(PEER_FMT " state: %d, rio: 0x%p\n", PEER_ARGS(ep), ep->state, rc.rio);
+	cancel_delayed_work(&ep->timer_work);
+	ep->retries++;
+
+	ep->conn = &rc.rio->netio.ioconn;
+	rc.rio->send_timeout = PCS_SIO_TIMEOUT;
+	rc.rio->rio_state = RIO_STATE_ESTABLISHED;
+	rc.rio->netio.ioconn.destruct = pcs_rdma_ioconn_destruct;
+	rc.rio->netio.tops = &pcs_rdma_netio_tops;
+	rc.rio->netio.getmsg = rpc_get_hdr;
+	rc.rio->netio.eof = rpc_eof_cb;
+	if (ep->gc)
+		list_lru_add(&ep->gc->lru, &ep->lru_link);
+
+	if (ep->flags & PCS_RPC_F_CLNT_PEER_ID)
+		ep->flags |= PCS_RPC_F_PEER_ID;
+
+	ep->state = PCS_RPC_AUTH;
+	ret = rpc_client_start_auth(ep, PCS_AUTH_DIGEST,
+				    cc_from_rpc(ep->eng)->cluster_name);
+	if (ret < 0) {
+		TRACE("rdma authorization failed: %d, rio: 0x%p",
+		      ret, rc.rio);
+		goto fail; /* since ep->conn is initialized,
+			    * rio will be freed in pcs_rpc_reset()
+			    */
+	}
+
+	TRACE("rdma connection established: rio: 0x%p\n", rc.rio);
+
+	ep->state = PCS_RPC_APPWAIT;
+	pcs_rpc_enable(ep, 0);
+	return;
+
+fail_cm:
+	if (rc.rio)
+		pcs_rdma_destroy(rc.rio);
+	if (rc.cmid)
+		rdma_destroy_id(rc.cmid);
+fail:
+	pcs_rpc_reset(ep);
+	return;
+}
diff --git a/fs/fuse/kio/pcs/pcs_rdma_conn.h b/fs/fuse/kio/pcs/pcs_rdma_conn.h
new file mode 100644
index 000000000000..88c987e06cd6
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_conn.h
@@ -0,0 +1,6 @@
+#ifndef _PCS_RMDA_CONN_H_
+#define _PCS_RMDA_CONN_H_ 1
+
+void pcs_rdmaconnect_start(struct pcs_rpc *ep);
+
+#endif /* _PCS_RMDA_CONN_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c
new file mode 100644
index 000000000000..629a81e8fa3c
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_io.c
@@ -0,0 +1,1720 @@
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+
+#include <rdma/ib_verbs.h>
+
+#include "pcs_types.h"
+#include "pcs_rdma_io.h"
+#include "pcs_rdma_rw.h"
+#include "pcs_cluster.h"
+#include "pcs_rpc.h"
+#include "log.h"
+
+#define RDMA_THRESHOLD (5*1024)
+
+#define RDMA_MAX_MSG_PAYLOAD (32 << PAGE_SHIFT)
+#define RDMA_MAX_SEGMENTS 256
+
+enum {
+	PCS_RDMA_IO_ERROR,
+	PCS_RDMA_IO_CQE,
+};
+
+//#undef TRACE
+//#define TRACE(fmt, args...) printk(KERN_ERR "%s:%d: " fmt, __func__, __LINE__, ## args)
+
+struct rio_job
+{
+	struct list_head list;
+
+	int (*work)(struct rio_job *job);
+	void (*destroy)(struct rio_job *job);
+};
+
+struct rio_cqe
+{
+	enum ib_wc_status status;
+	int ib_wr_count;
+	void (*done)(struct rio_cqe *cqe, bool sync_mode);
+};
+
+struct rio_rx {
+	struct list_head list;
+
+	struct pcs_rdmaio *rio;
+	struct rio_cqe cqe;
+	struct ib_sge sge;
+	struct ib_recv_wr wr;
+};
+
+enum {
+	TX_FREE,                   /* free tx request available for use */
+	TX_WAIT_FOR_TX_COMPL,      /* tx request sent, wait for TX completion */
+	TX_WAIT_FOR_READ_ACK,      /* wait for peer to ack RDMA read */
+	TX_MSG_DONE,               /* default: call msg->done() */
+	TX_SUBMIT_RDMA_READ_ACK,   /* let our peer know that our RDMA_READ is done */
+};
+
+struct rio_tx {
+	struct list_head list;  /* either member of rio->dev->free_txs or rio->active_txs */
+	struct pcs_msg *msg;    /* msg to call ->done() when we're done */
+	u64 xid;                /* xid that we've read from wire; used to construct ACK */
+	int tx_state;           /* what we should do on TX completion; see enum above */
+
+	char *buf;
+	dma_addr_t dma_addr;
+
+	struct pcs_rdmaio *rio;
+
+	struct rio_cqe cqe;
+	struct rio_cqe err_cqe;
+
+	union {
+		struct {
+			struct ib_sge sge;
+			struct ib_send_wr wr;
+			struct pcs_rdma_msg msg;
+		} send;
+
+		struct {
+			struct ib_sge sge;
+			struct ib_rdma_wr wr;
+			struct pcs_rdma_msg msg;
+		} rdma_mr;
+
+		struct pcs_rdma_rw rdma_rw;
+	};
+
+	void (*cleanup)(struct rio_tx *tx);
+};
+
+struct rio_rdma_read_job {
+	struct rio_job job;
+
+	struct pcs_rdmaio *rio;
+	struct pcs_msg *msg;
+	int offset;
+	struct pcs_remote_buf rb;
+};
+
+struct pcs_rdma_device {
+	struct ib_device *ib_dev;
+	struct ib_pd *pd;
+
+	struct list_head free_txs; /* list head of free TX frames */
+	int free_txs_cnt;
+
+	struct pcs_ib_mr_pool ib_mr_pool;
+	struct pcs_rdma_mr_pool sd_mr_pool;
+	struct pcs_rdma_mr_pool rd_mr_pool;
+};
+
+extern bool rdmaio_use_map_for_mr;
+extern bool rdmaio_use_dma_mr_for_rdma_rw;
+extern unsigned int rdmaio_cq_count;
+extern unsigned int rdmaio_cq_period;
+
+static void rio_abort(struct pcs_rdmaio *rio, int error);
+
+static void rio_rx_done(struct rio_cqe *cqe, bool sync_mode);
+static void rio_tx_done(struct rio_cqe *cqe, bool sync_mode);
+static void rio_tx_err_occured(struct rio_cqe *cqe, bool sync_mode);
+
+/*
+ * A trivial helper representing 1:1 mapping between
+ * rio->rx_descs[RIO_N_RX] and rio->rx_bufs[queue_depth * RIO_MSG_SIZE]
+ */
+static char *rx2buf(struct pcs_rdmaio *rio, struct rio_rx *rx)
+{
+	return rio->rx_bufs + RIO_MSG_SIZE * (rx - rio->rx_descs);
+}
+static dma_addr_t rx2dma(struct pcs_rdmaio *rio, struct rio_rx *rx)
+{
+	return rio->rx_bufs_dma + RIO_MSG_SIZE * (rx - rio->rx_descs);
+}
+
+/* Only called when rio->write_queue is not empty */
+static struct pcs_msg *rio_dequeue_msg(struct pcs_rdmaio *rio)
+{
+	struct pcs_msg *msg = list_first_entry(&rio->write_queue,
+					       struct pcs_msg, list);
+	list_del_init(&msg->list);
+	return msg;
+}
+
+/* Only called when rio->reserved_queue is not empty */
+static struct pcs_msg *rio_dequeue_reserved_msg(struct pcs_rdmaio *rio)
+{
+	struct pcs_msg *msg = list_first_entry(&rio->reserved_queue,
+					       struct pcs_msg, list);
+	list_del_init(&msg->list);
+	return msg;
+}
+
+static void rio_msg_sent(struct pcs_rdmaio *rio, struct rio_tx *tx, struct pcs_msg *msg, int done)
+{
+	if (done) {
+		pcs_msg_sent(msg);
+		msg->done(msg);
+	} else {
+		tx->msg = msg;
+		list_add_tail(&tx->list, &rio->active_txs);
+	}
+}
+
+static struct rio_tx *rio_alloc_tx(struct pcs_rdma_device *dev,
+				   int state)
+{
+	struct rio_tx *tx;
+
+	tx = RE_NULL(kzalloc(sizeof(struct rio_tx), GFP_NOIO));
+	if (!tx)
+		return NULL;
+
+	tx->buf = RE_NULL(ib_dma_alloc_coherent(dev->ib_dev, RIO_MSG_SIZE,
+						&tx->dma_addr,
+						GFP_NOIO | __GFP_NOWARN));
+	if (!tx->buf) {
+		kfree(tx);
+		return NULL;
+	}
+
+	tx->tx_state = state;
+
+	return tx;
+}
+
+static void rio_free_tx(struct pcs_rdma_device *dev, struct rio_tx *tx)
+{
+	ib_dma_free_coherent(dev->ib_dev, RIO_MSG_SIZE, tx->buf, tx->dma_addr);
+	kfree(tx);
+}
+
+static struct rio_tx *rio_get_tx(struct pcs_rdma_device *dev)
+{
+	struct rio_tx *tx;
+
+	if (list_empty(&dev->free_txs))
+		return NULL;
+
+	tx = list_first_entry(&dev->free_txs, struct rio_tx, list);
+	list_del(&tx->list);
+	dev->free_txs_cnt--;
+	BUG_ON(dev->free_txs_cnt < 0);
+
+	BUG_ON(tx->tx_state != TX_FREE);
+
+	tx->tx_state = TX_MSG_DONE;
+	tx->xid = 0;
+
+	return tx;
+}
+
+static void rio_put_tx(struct pcs_rdma_device *dev, struct rio_tx *tx)
+{
+	BUG_ON(tx->tx_state == TX_FREE);
+
+	if (tx->cleanup) {
+		tx->cleanup(tx);
+		tx->cleanup = NULL;
+	}
+	tx->msg = NULL;
+	tx->xid = 0;
+	tx->tx_state = TX_FREE;
+
+	list_add(&tx->list, &dev->free_txs);
+	dev->free_txs_cnt++;
+}
+
+enum {
+	SUBMIT_REGULAR,
+	SUBMIT_NOOP,
+	SUBMIT_RDMA_READ_ACK,
+};
+
+static inline void rio_cqe_init(struct rio_cqe *cqe, int ib_wr_count,
+				void (*done)(struct rio_cqe *cqe, bool sync_mode))
+{
+	cqe->status = IB_WC_SUCCESS;
+	cqe->ib_wr_count = ib_wr_count;
+	cqe->done = done;
+}
+
+static inline void rio_job_init(struct rio_job *job,
+				int (*work)(struct rio_job *job),
+				void (*destroy)(struct rio_job *job))
+{
+	INIT_LIST_HEAD(&job->list);
+	job->work = work;
+	job->destroy = destroy;
+}
+
+static inline void rio_post_tx_job(struct pcs_rdmaio *rio,
+				   struct rio_job *job)
+{
+	list_add_tail(&job->list, &rio->tx_jobs);
+}
+
+static inline void rio_perform_tx_jobs(struct pcs_rdmaio *rio)
+{
+	struct rio_job *job, *tmp;
+
+	list_for_each_entry_safe(job, tmp, &rio->tx_jobs, list) {
+		if (job->work(job) == -EAGAIN)
+			break;
+		list_del(&job->list);
+		job->destroy(job);
+	}
+}
+
+static int rio_rx_post(struct pcs_rdmaio *rio, struct rio_rx *rx,
+		       u32 length)
+{
+	int ret;
+
+	if (rio->rio_state == RIO_STATE_ABORTED)
+		return -ECONNABORTED;
+
+	rx->rio = rio;
+
+	rx->sge.addr = rx2dma(rio, rx);
+	rx->sge.length = length;
+	rx->sge.lkey = rio->dev->pd->local_dma_lkey;
+
+	memset(&rx->wr, 0, sizeof(rx->wr));
+	rx->wr.wr_id = (uintptr_t)&rx->cqe;
+	rx->wr.sg_list = &rx->sge;
+	rx->wr.num_sge = 1;
+
+	rio_cqe_init(&rx->cqe, 1, rio_rx_done);
+
+	ret = RE_INV(ib_post_recv(rio->cmid->qp, &rx->wr, NULL));
+	if (ret) {
+		TRACE("ib_post_recv failed: %d, rio: 0x%p\n", ret, rio);
+	} else {
+		rio->n_rx_posted++;
+	}
+
+	return ret;
+}
+
+static int rio_tx_post(struct pcs_rdmaio *rio, struct rio_tx *tx,
+		       struct ib_send_wr *send_wr)
+{
+	struct ib_send_wr *wr;
+	int ib_wr_count = 0;
+	int ret;
+
+	if (rio->rio_state == RIO_STATE_ABORTED)
+		return -ECONNABORTED;
+
+	tx->rio = rio;
+
+	for (wr = send_wr; wr; wr = wr->next) {
+		BUG_ON(wr->wr_id);
+		BUG_ON(wr->send_flags & IB_SEND_SIGNALED);
+		if (wr->next) {
+			wr->wr_id = (uintptr_t)&tx->err_cqe;
+			wr->send_flags = 0;
+		} else {
+			wr->wr_id = (uintptr_t)&tx->cqe;
+			wr->send_flags = IB_SEND_SIGNALED;
+		}
+		ib_wr_count++;
+	}
+
+	rio_cqe_init(&tx->cqe, ib_wr_count, rio_tx_done);
+	rio_cqe_init(&tx->err_cqe, 0, rio_tx_err_occured);
+
+	if (rio->n_tx_posted + ib_wr_count > rio->max_send_wr) {
+		TRACE("ib send queue overflow: rio: 0x%p\n", rio);
+		return -ENOMEM;
+	}
+
+	ret = RE_INV(ib_post_send(rio->cmid->qp, send_wr, NULL));
+	if (ret) {
+		TRACE("ib_post_send failed: %d, rio: 0x%p\n", ret, rio);
+	} else {
+		rio->n_tx_posted += ib_wr_count;
+	}
+
+	return ret;
+}
+
+static int rio_tx_post_send(struct pcs_rdmaio *rio, struct rio_tx *tx,
+			    u32 length, struct ib_send_wr *first_wr,
+			    struct ib_send_wr *last_wr)
+{
+	struct ib_send_wr *send_wr;
+
+	tx->send.sge.addr = tx->dma_addr;
+	tx->send.sge.length = length;
+	tx->send.sge.lkey = rio->dev->pd->local_dma_lkey;
+
+	memset(&tx->send.wr, 0, sizeof(tx->send.wr));
+	tx->send.wr.opcode = IB_WR_SEND;
+	tx->send.wr.sg_list = &tx->send.sge;
+	tx->send.wr.num_sge = 1;
+
+	send_wr = &tx->send.wr;
+	if (first_wr && last_wr) {
+		last_wr->next = send_wr;
+		send_wr = first_wr;
+	}
+
+	return rio_tx_post(rio, tx, send_wr);
+}
+
+static int rio_tx_post_rdma_mr_read(struct pcs_rdmaio *rio, struct rio_tx *tx,
+				    u64 remote_addr, u32 rkey, u32 length)
+{
+	struct ib_send_wr *send_wr;
+
+	tx->rdma_mr.sge.addr = tx->rdma_mr.msg.iova;
+	tx->rdma_mr.sge.length = length;
+	tx->rdma_mr.sge.lkey = tx->rdma_mr.msg.lkey;
+
+	memset(&tx->rdma_mr.wr, 0, sizeof(tx->rdma_mr.wr));
+	tx->rdma_mr.wr.wr.opcode = IB_WR_RDMA_READ;
+	tx->rdma_mr.wr.wr.sg_list = &tx->rdma_mr.sge;
+	tx->rdma_mr.wr.wr.num_sge = 1;
+	tx->rdma_mr.wr.remote_addr = remote_addr;
+	tx->rdma_mr.wr.rkey = rkey;
+
+	send_wr = &tx->rdma_mr.wr.wr;
+	if (tx->rdma_mr.msg.first_wr && tx->rdma_mr.msg.last_wr) {
+		tx->rdma_mr.msg.last_wr->next = send_wr;
+		send_wr = tx->rdma_mr.msg.first_wr;
+	}
+
+	return rio_tx_post(rio, tx, send_wr);
+}
+
+static int rio_tx_post_rdma_rw_read(struct pcs_rdmaio *rio, struct rio_tx *tx)
+{
+	if (!tx->rdma_rw.nr_wrs)
+		return -EINVAL;
+	return rio_tx_post(rio, tx, &tx->rdma_rw.wrs->wr);
+}
+
+static void rio_tx_cleanup_rdma_mr(struct rio_tx *tx)
+{
+	pcs_rdma_msg_destroy(&tx->rdma_mr.msg);
+}
+
+static void rio_tx_cleanup_rdma_rw(struct rio_tx *tx)
+{
+	pcs_rdma_rw_destroy(&tx->rdma_rw);
+}
+
+static int rio_submit_rdma_read(struct pcs_rdmaio *rio, struct pcs_msg *msg,
+				int offset, struct pcs_remote_buf *rb, bool allow_again)
+{
+	struct pcs_rdma_device *dev = rio->dev;
+	struct rio_tx *tx;
+
+	tx = RE_NULL(rio_get_tx(dev));
+	if (!tx) {
+		if (allow_again)
+			return -EAGAIN;
+		goto fail;
+	}
+
+	BUG_ON(!rb);
+
+	tx->tx_state = TX_SUBMIT_RDMA_READ_ACK;
+	tx->msg = msg;
+	tx->xid = rb->xid;
+
+	if (rdmaio_use_dma_mr_for_rdma_rw) {
+		if (pcs_rdma_rw_init_from_msg(&tx->rdma_rw, rio->cmid->device,
+					      DMA_FROM_DEVICE, rb->rbuf, rb->rkey,
+					      rio->dev->pd->local_dma_lkey, msg, offset,
+					      offset + rb->rlen, GFP_NOIO, rio->cmid->qp->max_read_sge)) {
+			TRACE("pcs_rdma_rw_init_from_msg failed, try fallback: rio: 0x%p\n", rio);
+			goto fallback;
+		}
+		tx->cleanup = rio_tx_cleanup_rdma_rw;
+
+		if (rio_tx_post_rdma_rw_read(rio, tx)) {
+			TRACE("rio_tx_post_rdma_rw_read failed: rio: 0x%p\n", rio);
+			goto fail;
+		}
+	} else {
+fallback:
+		if (pcs_rdma_msg_init(&tx->rdma_mr.msg, msg, offset, offset + rb->rlen,
+				      &dev->rd_mr_pool, rdmaio_use_map_for_mr)) {
+			TRACE("rio_rdma_mr_init failed: rio: 0x%p\n", rio);
+			goto fail;
+		}
+		tx->cleanup = rio_tx_cleanup_rdma_mr;
+
+		if (rio_tx_post_rdma_mr_read(rio, tx, rb->rbuf, rb->rkey, rb->rlen)) {
+			TRACE("rio_tx_post_rdma_mr_read failed: rio: 0x%p\n", rio);
+			goto fail;
+		}
+	}
+
+	return 0;
+
+fail:
+	if (tx)
+		rio_put_tx(dev, tx);
+	pcs_free_msg(msg);
+	rio_abort(rio, PCS_ERR_NET_ABORT);
+
+	return -EIO;
+}
+
+static int rio_rdma_read_job_work(struct rio_job *j)
+{
+	struct rio_rdma_read_job *job = container_of(j, struct rio_rdma_read_job, job);
+	struct pcs_rdmaio *rio = job->rio;
+
+	if (rio->rio_state != RIO_STATE_ESTABLISHED) {
+		pcs_free_msg(job->msg);
+		return 0;
+	}
+
+	return rio_submit_rdma_read(rio, job->msg, job->offset,
+				    &job->rb, true);
+}
+
+static void rio_rdma_read_job_destroy(struct rio_job *j)
+{
+	struct rio_rdma_read_job *job = container_of(j, struct rio_rdma_read_job, job);
+	kfree(job);
+}
+
+static inline struct rio_rdma_read_job* rio_rdma_read_job_alloc(struct pcs_rdmaio *rio,
+								struct pcs_msg *msg,
+								int offset,
+								struct pcs_remote_buf *rb)
+{
+	struct rio_rdma_read_job *job;
+
+	job = RE_NULL(kzalloc(sizeof(struct rio_rdma_read_job), GFP_NOIO));
+	if (!job)
+		return NULL;
+
+	rio_job_init(&job->job, rio_rdma_read_job_work, rio_rdma_read_job_destroy);
+	job->rio = rio;
+	job->msg = msg;
+	job->offset = offset;
+	memcpy(&job->rb, rb, sizeof(job->rb));
+
+	return job;
+}
+
+static int msg_is_large(struct pcs_msg *msg)
+{
+	int hdr_len = sizeof(struct pcs_rdmaio_hdr);
+	return msg->size + hdr_len > RDMA_THRESHOLD;
+}
+
+static int rio_init_msg(char *buf, int payload_size, int credits, int submit_type,
+			struct pcs_remote_buf **rb, struct pcs_rdma_ack **rack)
+{
+	struct pcs_rdmaio_hdr *hdr = (struct pcs_rdmaio_hdr *)buf;
+	int hdr_len = sizeof(*hdr);
+	int type = RIO_MSG_IMMEDIATE;
+	int addon_len = 0;
+
+	switch (submit_type) {
+	case SUBMIT_NOOP:
+		type = RIO_MSG_NOOP;
+		break;
+	case SUBMIT_REGULAR:
+		if (hdr_len + payload_size > RDMA_THRESHOLD) {
+			type = RIO_MSG_RDMA_READ_REQ;
+			*rb = (struct pcs_remote_buf *)(buf + hdr_len);
+			addon_len = sizeof(struct pcs_remote_buf);
+		}
+		break;
+	case SUBMIT_RDMA_READ_ACK:
+		type = RIO_MSG_RDMA_READ_ACK;
+		*rack = (struct pcs_rdma_ack *)(buf + hdr_len);
+		addon_len = sizeof(struct pcs_rdma_ack);
+		break;
+	default:
+		BUG();
+	}
+
+	hdr->magic = RIO_MAGIC;
+	hdr->version = RIO_VERSION;
+	hdr->type = type;
+	hdr->size = hdr_len + addon_len;
+	hdr->credits = credits;
+
+	return hdr->size;
+}
+
+static void rio_update_msg_immediate(char *buf, int copied)
+{
+	struct pcs_rdmaio_hdr *hdr = (struct pcs_rdmaio_hdr *)buf;
+
+	hdr->size += copied;
+}
+
+static void rio_tx_cleanup_send(struct rio_tx *tx)
+{
+	pcs_rdma_msg_destroy(&tx->send.msg);
+}
+
+static int rio_submit(struct pcs_rdmaio *rio, struct pcs_msg *msg, int type, u64 xid, int status,
+		      bool allow_again)
+{
+	struct pcs_rdma_device *dev = rio->dev;
+	struct rio_tx *tx;
+	struct ib_send_wr *first_tx_wr = NULL;
+	struct ib_send_wr *last_tx_wr = NULL;
+	int credits = rio->n_os_credits;
+	int msg_size = msg ? msg->size : 0;
+	struct pcs_remote_buf *rb = NULL;
+	struct pcs_rdma_ack *rack = NULL;
+	int hdr_len;
+	size_t tx_length;
+	char *payload;
+	int offset = 0;
+	struct iov_iter it;
+
+	tx = RE_NULL(rio_get_tx(dev));
+	if (!tx) {
+		if (allow_again)
+			return -EAGAIN;
+		goto fail;
+	}
+
+	hdr_len = rio_init_msg(tx->buf, msg_size, credits, type, &rb, &rack);
+	tx_length = hdr_len;
+	payload = tx->buf + hdr_len;
+
+	if (rack) {
+		rack->xid    = xid;
+		rack->status = status;
+	} else if (rb) {
+		rio->xid_generator++;
+		rb->xid = tx->xid = rio->xid_generator;
+		tx->tx_state = TX_WAIT_FOR_TX_COMPL;
+	}
+
+	iov_iter_kvec(&it, WRITE, NULL, 0, 0);
+	while (offset < msg_size) {
+		size_t copy, res;
+
+		if (!iov_iter_count(&it))
+			msg->get_iter(msg, offset, &it, WRITE);
+
+		copy = iov_iter_count(&it);
+		if (copy > msg_size - offset)
+			copy = msg_size - offset;
+
+		if (hdr_len + offset + copy > RDMA_THRESHOLD) {
+			if (tx_length < hdr_len + rio->hdr_size) {
+				copy = RDMA_THRESHOLD - offset - hdr_len;
+			} else {
+				if (pcs_rdma_msg_init(&tx->send.msg, msg, offset, msg_size,
+						      &dev->sd_mr_pool, rdmaio_use_map_for_mr)) {
+					TRACE("rio_rdma_mr_init failed: rio: 0x%p\n", rio);
+					goto fail;
+				}
+				tx->cleanup = rio_tx_cleanup_send;
+				first_tx_wr = tx->send.msg.first_wr;
+				last_tx_wr = tx->send.msg.last_wr;
+				rb->rbuf = tx->send.msg.iova;
+				rb->rkey = tx->send.msg.rkey;
+				rb->rlen = msg_size - offset;
+				break;
+			}
+		}
+
+		res = copy_from_iter(payload + offset, copy, &it);
+		BUG_ON(res != copy);
+		tx_length += copy;
+
+		offset += copy;
+		rio_update_msg_immediate(tx->buf, copy);
+	}
+
+	if (rio_tx_post_send(rio, tx, tx_length, first_tx_wr, last_tx_wr)) {
+		TRACE("rio_tx_post_send failed: rio: 0x%p\n", rio);
+		goto fail;
+	}
+
+	rio->n_os_credits -= credits;
+	if (msg) {
+		rio->n_peer_credits--;
+		if (rb)
+			rio->n_reserved_credits--;
+		BUG_ON(rio->n_peer_credits < 0);
+		BUG_ON(rio->n_reserved_credits < 0);
+
+		/*
+		 * It's possible to see RX completion for response to this message
+		 * *before* we see TX completion for this message. This will result
+		 * in RPC's handle_response failing to find corresponding TX by xid.
+		 *
+		 * Thus, we shouldn't wait for TX completion to tell upper layer that
+		 * the message has been sent and do it right after
+		 * rio_tx_post_send completes (similar to TCP). If
+		 * rio_tx_post_send() fails eventually, we will receive TX
+		 * completion with an error flag and cancel all
+		 * outstanding/pending RPC requests. So we are not going to
+		 * lose an error.
+		 *
+		 * But, if the message is big enough to trigger RDMA READ
+		 * transfer, we are going to call ->done() callback after we
+		 * receive RDMA_READ_ACK message from our peer. Since messages
+		 * in a single RX queue are guaranteed to come in order, there
+		 * is no race in this case.
+		 */
+		rio_msg_sent(rio, tx, msg, rb == NULL);
+	}
+
+	return 0;
+
+fail:
+	if (tx)
+		rio_put_tx(dev, tx);
+	if (msg)
+		list_add(&msg->list, &rio->write_queue);
+	rio_abort(rio, PCS_ERR_NET_ABORT);
+
+	return -EIO;
+}
+
+static inline void rio_enable_kick(struct pcs_rdmaio *rio)
+{
+	rio->no_kick--;
+	BUG_ON(rio->no_kick < 0);
+}
+
+static inline void rio_disable_kick(struct pcs_rdmaio *rio)
+{
+	rio->no_kick++;
+}
+
+static inline void rio_kick_write_queue(struct pcs_rdmaio *rio)
+{
+	if (rio->no_kick)
+		return;
+	rio_disable_kick(rio);
+
+	rio_perform_tx_jobs(rio);
+
+	/* Main loop sending large messages from reserved_queue */
+	while (rio->rio_state == RIO_STATE_ESTABLISHED &&
+	       rio->dev->free_txs_cnt > rio->queue_depth &&
+	       !list_empty(&rio->reserved_queue) && rio->n_peer_credits &&
+	       rio->n_reserved_credits) {
+		struct pcs_msg *msg = rio_dequeue_reserved_msg(rio);
+		if (rio_submit(rio, msg, SUBMIT_REGULAR, 0, 0, true) == -EAGAIN) {
+			list_add(&msg->list, &rio->reserved_queue);
+			break;
+		}
+	}
+
+	/* Main loop sending ordinary messages from write_queue */
+	while (rio->rio_state == RIO_STATE_ESTABLISHED &&
+	       rio->dev->free_txs_cnt > rio->queue_depth &&
+	       !list_empty(&rio->write_queue) && rio->n_peer_credits) {
+		struct pcs_msg *msg = rio_dequeue_msg(rio);
+
+		if (!rio->n_reserved_credits && msg_is_large(msg)) {
+			list_add_tail(&msg->list, &rio->reserved_queue);
+		} else if (rio_submit(rio, msg, SUBMIT_REGULAR, 0, 0, true) == -EAGAIN) {
+			list_add(&msg->list, &rio->write_queue);
+			break;
+		}
+	}
+
+	/* Return credits by NOOP only if we have many enough to return AND
+	 * we cannot piggyback it by sending a message from write_queue */
+	if (rio->rio_state == RIO_STATE_ESTABLISHED &&
+	    rio->dev->free_txs_cnt > rio->queue_depth &&
+	    rio->n_os_credits >= rio->n_th_credits)
+		rio_submit(rio, NULL, SUBMIT_NOOP, 0, 0, true);
+
+	rio_enable_kick(rio);
+}
+
+static void rio_handle_tx(struct pcs_rdmaio *rio, struct rio_tx *tx, int ok)
+{
+	struct pcs_msg *msg = tx->msg;
+	u64 xid = tx->xid;
+
+	/* override remote success if we already aborted */
+	if (rio->rio_state == RIO_STATE_ABORTED)
+		ok = 0;
+
+	if (!ok)
+		rio_abort(rio, PCS_ERR_NET_ABORT);
+
+	switch (tx->tx_state) {
+		case TX_SUBMIT_RDMA_READ_ACK:
+			rio_put_tx(rio->dev, tx);
+			rio_submit(rio, NULL, SUBMIT_RDMA_READ_ACK, xid, !ok, false);
+			break;
+		case TX_WAIT_FOR_TX_COMPL:
+		case TX_WAIT_FOR_READ_ACK:
+			if (++tx->tx_state != TX_MSG_DONE)
+				return;
+		case TX_MSG_DONE:
+			rio_put_tx(rio->dev, tx);
+			break;
+		default:
+			BUG();
+	}
+
+	if (msg) {
+		if (!ok)
+			pcs_set_local_error(&msg->error, PCS_ERR_NET_ABORT);
+
+		rio_msg_sent(rio, NULL, msg, 1);
+	}
+}
+
+/*
+ * rio wire header is already stripped, buf points to payload data (pcs_rpc hdr)
+ */
+static int rio_handle_rx_immediate(struct pcs_rdmaio *rio, char *buf, int len,
+				   struct pcs_remote_buf *rb, int *throttle)
+{
+	struct pcs_msg *msg;
+	u32 msg_size;
+	int offset = rio->hdr_size;
+	struct iov_iter it;
+
+	if (len < rio->hdr_size) {
+		TRACE("rio read short msg: %d < %d, rio: 0x%p\n", len,
+		      rio->hdr_size, rio);
+		return PCS_ERR_NET_ABORT;
+	}
+
+	msg = rio->netio.getmsg(&rio->netio, buf, &msg_size);
+	if (msg == NULL) {
+		int err = 0;
+		if (rio->throttled)
+			*throttle = 1;
+		else
+			err = PCS_ERR_NOMEM;
+		return err;
+	} else if (msg == PCS_TRASH_MSG) {
+		TRACE("rio drop trash msg: %u, rio: 0x%p\n", msg_size, rio);
+		return 0;
+	}
+
+	if (msg->size != len + (rb ? rb->rlen : 0)) {
+		TRACE("rio read wrong len: %d != %d (%llx/%x/%d), rio: 0x%p",
+		      len, msg->size, rb ? rb->rbuf : 0ULL,
+		      rb ? rb->rkey : 0, rb ? rb->rlen : -1,
+		      rio);
+		pcs_free_msg(msg);
+		return PCS_ERR_NET_ABORT;
+	}
+
+	iov_iter_kvec(&it, READ, NULL, 0, 0);
+	while (offset < len) {
+		size_t body_len, res;
+
+		if (!iov_iter_count(&it))
+			msg->get_iter(msg, offset, &it, READ);
+
+		body_len = iov_iter_count(&it);
+		if (body_len > len - offset)
+			body_len = len - offset;
+
+		res = copy_to_iter(buf + offset, body_len, &it);
+		BUG_ON(res != body_len);
+
+		offset += body_len;
+	}
+
+	if (len == msg->size) {
+		msg->done(msg);
+	} else if (rio_submit_rdma_read(rio, msg, offset, rb, true) == -EAGAIN) {
+		struct rio_rdma_read_job *job;
+		job = rio_rdma_read_job_alloc(rio, msg, offset, rb);
+		if (!job)
+			rio_submit_rdma_read(rio, msg, offset, rb, false);
+		else
+			rio_post_tx_job(rio, &job->job);
+	}
+
+	return 0;
+}
+
+static int rio_handle_rx_read_ack(struct pcs_rdmaio *rio,
+				  struct pcs_rdma_ack *rack)
+{
+	struct rio_tx *tx;
+
+	list_for_each_entry(tx, &rio->active_txs, list)
+		if (tx->xid == rack->xid) {
+			list_del(&tx->list);
+			rio_handle_tx(rio, tx, !rack->status);
+			return 0;
+		}
+
+	return PCS_ERR_NET_ABORT;
+}
+
+/*
+ * When we see RX coming from the wire very first time, flag "pended" is
+ * false and we naturally update n_rx_posted and n_peer_credits.
+ *
+ * Later on, due to throttling, the RX may reside in pended_rxs for a while.
+ * Then, handling unthrottle event, we will see this RX again, the "pended"
+ * flag is true. This means we should not touch n_rx_posted and
+ * n_peer_credits again.
+ */
+static void rio_handle_rx(struct pcs_rdmaio *rio, struct rio_rx *rx,
+			  enum ib_wc_status status, int pended)
+{
+	char *buf = rx2buf(rio, rx);
+	int ok = (status == IB_WC_SUCCESS) &&
+		 (rio->rio_state == RIO_STATE_ESTABLISHED);
+	char *payload = NULL;
+	int payload_size = 0;
+	int credits = 0;
+	int throttle = 0;
+	int type;
+	int err = PCS_ERR_NET_ABORT;
+	struct pcs_remote_buf *rb   = NULL;
+	struct pcs_rdma_ack *rack = NULL;
+
+	if (!ok) {
+		rio_abort(rio, PCS_ERR_NET_ABORT);
+		return;
+	}
+
+	type = rio_parse_hdr(buf, &payload, &payload_size, &credits, &rb, &rack,
+			     rio->queue_depth);
+
+	switch (type) {
+	case RIO_MSG_IMMEDIATE:
+	case RIO_MSG_RDMA_READ_REQ:
+		err = rio_handle_rx_immediate(rio, payload, payload_size, rb, &throttle);
+		if (err)
+			goto do_abort;
+		break;
+	case RIO_MSG_NOOP:
+		/* for now, it only returns credits */
+		break;
+	case RIO_MSG_RDMA_READ_ACK:
+		BUG_ON(!rack);
+		err = rio_handle_rx_read_ack(rio, rack);
+		if (err)
+			goto do_abort;
+		break;
+	default:
+		goto do_abort;
+	}
+
+	if (!throttle) {
+		if (rio_rx_post(rio, rx, RIO_MSG_SIZE)) {
+			TRACE("rio_rx_post failed: rio: 0x%p\n", rio);
+			rio_abort(rio, PCS_ERR_NET_ABORT);
+			return;
+		}
+
+		if (type != RIO_MSG_NOOP &&
+		    type != RIO_MSG_RDMA_READ_ACK)
+			rio->n_os_credits++;
+
+		if (type == RIO_MSG_RDMA_READ_ACK)
+			rio->n_reserved_credits++;
+
+		BUG_ON(rio->n_reserved_credits > rio->queue_depth);
+		if (rio->n_os_credits > rio->queue_depth) {
+			TRACE("n_os_credits overflow: rio: 0x%p\n", rio);
+			rio_abort(rio, PCS_ERR_NET_ABORT);
+			return;
+		}
+	} else
+		list_add(&rx->list, &rio->pended_rxs);
+
+	if (!pended)
+		rio->n_peer_credits += credits;
+
+	return;
+
+do_abort:
+	rio_abort(rio, err);
+}
+
+static void rio_handle_pended_rxs(struct pcs_rdmaio *rio)
+{
+	LIST_HEAD(local);
+
+	list_splice_init(&rio->pended_rxs, &local);
+
+	while (!list_empty(&local)) {
+		struct rio_rx *rx;
+
+		rx = list_first_entry(&local, struct rio_rx, list);
+		list_del(&rx->list);
+
+		rio_handle_rx(rio, rx, IB_WC_SUCCESS, 1);
+	}
+}
+
+static void rio_rx_done(struct rio_cqe *cqe, bool sync_mode)
+{
+	struct rio_rx *rx = container_of(cqe, struct rio_rx, cqe);
+	struct pcs_rdmaio *rio = rx->rio;
+
+	rio->n_rx_posted -= cqe->ib_wr_count;
+	BUG_ON(rio->n_rx_posted < 0);
+
+	if (sync_mode) {
+		if (!rio_rx_post(rio, rx, RIO_MSG_SIZE))
+			rio->n_os_credits++;
+	} else {
+		rio_handle_rx(rio, rx, cqe->status, 0);
+	}
+}
+
+static void rio_tx_err_occured(struct rio_cqe *cqe, bool sync_mode)
+{
+	TRACE("status: %d\n", cqe->status);
+}
+
+static void rio_tx_done(struct rio_cqe *cqe, bool sync_mode)
+{
+	struct rio_tx *tx = container_of(cqe, struct rio_tx, cqe);
+	struct pcs_rdmaio *rio = tx->rio;
+
+	if (cqe->status == IB_WC_SUCCESS)
+		cqe->status = tx->err_cqe.status;
+
+	rio->n_tx_posted -= cqe->ib_wr_count;
+	BUG_ON(rio->n_tx_posted < 0);
+
+	if (sync_mode)
+		rio_put_tx(rio->dev, tx);
+	else
+		rio_handle_tx(rio, tx, cqe->status == IB_WC_SUCCESS);
+}
+
+static inline struct rio_cqe* rio_poll_cq(struct pcs_rdmaio *rio)
+{
+	struct rio_cqe *cqe = NULL;
+
+	if (rio->wc_idx >= rio->wc_cnt) {
+		rio->wc_cnt = ib_poll_cq(rio->cq, ARRAY_SIZE(rio->wcs),
+					 rio->wcs);
+		rio->wc_idx = 0;
+	}
+
+	if (rio->wc_idx < rio->wc_cnt) {
+		cqe = (void*)rio->wcs[rio->wc_idx].wr_id;
+		if (cqe->status == IB_WC_SUCCESS &&
+		    rio->wcs[rio->wc_idx].status != IB_WC_SUCCESS)
+			cqe->status = rio->wcs[rio->wc_idx].status;
+		rio->wc_idx++;
+	}
+
+	return cqe;
+}
+
+static inline int rio_req_notify_cq(struct pcs_rdmaio *rio)
+{
+	return ib_req_notify_cq(rio->cq, IB_CQ_NEXT_COMP |
+				IB_CQ_REPORT_MISSED_EVENTS);
+}
+
+static void pcs_rdma_cq_comp_handler(struct ib_cq *cq, void *private)
+{
+	struct pcs_rdmaio *rio = private;
+	struct pcs_rpc *ep = rio->netio.parent;
+
+	set_bit(PCS_RDMA_IO_CQE, &rio->io_flags);
+	wake_up(&rio->waitq);
+	pcs_rpc_kick_queue(ep);
+}
+
+static inline int rio_comp_perform(struct pcs_rdmaio *rio)
+{
+	struct rio_cqe *cqe;
+	int count = 0;
+
+	while ((cqe = rio_poll_cq(rio))) {
+		rio_disable_kick(rio);
+		cqe->done(cqe, false);
+		rio_enable_kick(rio);
+
+		rio_kick_write_queue(rio);
+		count++;
+	}
+
+	return count;
+}
+
+static void pcs_rdma_cq_event_handler(struct ib_event *event, void *private)
+{
+	struct pcs_rdmaio *rio = private;
+	TRACE("rio: 0x%p\n", rio);
+}
+
+static int pcs_rdma_io_event_handler(struct rdma_cm_id *cmid,
+		struct rdma_cm_event *event)
+{
+	struct pcs_rdma_id *id = cmid->context;
+	struct pcs_rdmaio *rio = container_of(id, struct pcs_rdmaio, id);
+	struct pcs_rpc *ep = rio->netio.parent;
+
+	TRACE("rio: 0x%p, event: %d, status: %d\n", rio, event->event, event->status);
+
+	set_bit(PCS_RDMA_IO_ERROR, &rio->io_flags);
+	pcs_rpc_kick_queue(ep);
+
+	return 0;
+}
+
+static void pcs_rdma_qp_event_handler(struct ib_event *event, void *context)
+{
+	struct pcs_rdmaio *rio = context;
+	TRACE("rio: 0x%p, event: %d\n", rio, event->event);
+}
+
+static struct pcs_rdma_device *pcs_rdma_device_create(struct rdma_cm_id *cmid,
+						      int queue_depth,
+						      int send_queue_depth)
+{
+	struct pcs_rdma_device *dev;
+	struct rio_tx *tx;
+	u32 max_num_sg = min_t(u32, RDMA_MAX_SEGMENTS,
+			       cmid->device->attrs.max_fast_reg_page_list_len);
+	int i;
+
+	dev = RE_NULL(kzalloc(sizeof(*dev), GFP_NOIO));
+	if (!dev)
+		return NULL;
+
+	dev->ib_dev = cmid->device;
+
+	INIT_LIST_HEAD(&dev->free_txs);
+	for (i = 0; i < send_queue_depth; i++) {
+		tx = rio_alloc_tx(dev, TX_MSG_DONE);
+		if (!tx)
+			goto free_bufs;
+		rio_put_tx(dev, tx);
+	}
+
+	dev->pd = RE_PTR_INV(ib_alloc_pd(dev->ib_dev, 0));
+	if (IS_ERR(dev->pd)) {
+		TRACE("ib_alloc_pd failed: dev: 0x%p\n", dev);
+		goto free_bufs;
+	}
+
+	if (pcs_ib_mr_pool_init(&dev->ib_mr_pool, dev->pd, IB_MR_TYPE_MEM_REG,
+			    max_num_sg,
+			    queue_depth * 2)) {
+		TRACE("pcs_ib_mr_pool_init failed: dev: 0x%p\n", dev);
+		goto free_pd;
+	}
+
+	if (pcs_rdma_mr_pool_init(&dev->sd_mr_pool, RDMA_MAX_MSG_PAYLOAD,
+				  queue_depth, dev->ib_dev, dev->pd, DMA_TO_DEVICE,
+				  GFP_NOIO, &dev->ib_mr_pool)) {
+		TRACE("pcs_rdma_mr_pool_init failed: dev: 0x%p\n", dev);
+		goto free_ib_mr;
+	}
+
+	if (pcs_rdma_mr_pool_init(&dev->rd_mr_pool, RDMA_MAX_MSG_PAYLOAD,
+				  queue_depth, dev->ib_dev, dev->pd, DMA_FROM_DEVICE,
+				  GFP_NOIO, &dev->ib_mr_pool)) {
+		TRACE("pcs_rdma_mr_pool_init failed: dev: 0x%p\n", dev);
+		goto free_sd_mr;
+	}
+
+	return dev;
+
+free_sd_mr:
+	pcs_rdma_mr_pool_destroy(&dev->sd_mr_pool);
+free_ib_mr:
+	pcs_ib_mr_pool_destroy(&dev->ib_mr_pool);
+free_pd:
+	ib_dealloc_pd(dev->pd);
+free_bufs:
+	while ((tx = rio_get_tx(dev)))
+		rio_free_tx(dev, tx);
+	kfree(dev);
+	return NULL;
+}
+
+static void pcs_rdma_device_destroy(struct pcs_rdma_device *dev)
+{
+	struct rio_tx *tx;
+
+	pcs_rdma_mr_pool_destroy(&dev->rd_mr_pool);
+	pcs_rdma_mr_pool_destroy(&dev->sd_mr_pool);
+	pcs_ib_mr_pool_destroy(&dev->ib_mr_pool);
+
+	ib_dealloc_pd(dev->pd);
+
+	while ((tx = rio_get_tx(dev)))
+		rio_free_tx(dev, tx);
+
+	kfree(dev);
+}
+
+struct pcs_rdmaio* pcs_rdma_create(int hdr_size, struct rdma_cm_id *cmid,
+				   int queue_depth, struct pcs_rpc *ep)
+{
+	struct pcs_rdmaio *rio;
+	struct rio_rx *rx;
+	struct ib_cq_init_attr cq_attr = {};
+	struct ib_qp_init_attr qp_attr = {};
+	int recv_queue_depth = queue_depth * 2 + 2;
+	int send_queue_depth = queue_depth * 4 + 4;
+	int rx_descs_siz = recv_queue_depth * sizeof(struct rio_rx);
+	int rx_bufs_siz = recv_queue_depth * RIO_MSG_SIZE;
+	static atomic_t comp_vector = ATOMIC_INIT(-1);
+	unsigned int cq_count = rdmaio_cq_count;
+	unsigned int cq_period = rdmaio_cq_period;
+	int max_recv_wr;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	BUILD_BUG_ON((TX_WAIT_FOR_READ_ACK - TX_WAIT_FOR_TX_COMPL) != 1);
+	BUILD_BUG_ON((TX_MSG_DONE - TX_WAIT_FOR_READ_ACK) != 1);
+
+	if (queue_depth < RIO_QUEUE_DEPTH)
+		queue_depth = RIO_QUEUE_DEPTH;
+	else if (queue_depth > RIO_MAX_QUEUE_DEPTH)
+		queue_depth = RIO_MAX_QUEUE_DEPTH;
+
+	rio = RE_NULL(kzalloc(sizeof(struct pcs_rdmaio), GFP_NOIO));
+	if (!rio)
+		return NULL;
+
+	rio->netio.parent = pcs_rpc_get(ep);
+	rio->id.event_handler = pcs_rdma_io_event_handler;
+
+	init_waitqueue_head(&rio->waitq);
+
+	rio->rio_state = RIO_STATE_CONNECTING;
+	rio->rio_error = PCS_ERR_NET_ABORT;
+
+	rio->hdr_size = hdr_size;
+	rio->queue_depth = queue_depth;
+	rio->send_queue_depth = send_queue_depth;
+
+	INIT_LIST_HEAD(&rio->tx_jobs);
+	INIT_LIST_HEAD(&rio->pended_rxs);
+
+	rio->n_peer_credits = queue_depth;
+	rio->n_reserved_credits = queue_depth;
+	rio->n_os_credits = 0;
+	rio->n_th_credits = queue_depth / 2;
+
+	rio->cmid = cmid;
+
+	INIT_LIST_HEAD(&rio->write_queue);
+	INIT_LIST_HEAD(&rio->reserved_queue);
+
+	rio->no_kick = 0;
+	rio->throttled = 0;
+
+	INIT_LIST_HEAD(&rio->active_txs);
+
+	rio->xid_generator = 0;
+
+	rio->conn_req.magic = RIO_MAGIC;
+	rio->conn_req.version = RIO_VERSION;
+	rio->conn_req.queue_depth = queue_depth;
+	rio->conn_req.msg_size = RIO_MSG_SIZE;
+
+	rio->rx_descs = RE_NULL(kzalloc(rx_descs_siz, GFP_NOIO | __GFP_NOWARN));
+	if (!rio->rx_descs)
+		goto free_rio;
+
+	rio->rx_bufs_size = rx_bufs_siz;
+	rio->rx_bufs = RE_NULL(ib_dma_alloc_coherent(rio->cmid->device, rio->rx_bufs_size,
+						     &rio->rx_bufs_dma,
+						     GFP_NOIO | __GFP_NOWARN));
+	if (!rio->rx_bufs) {
+		TRACE("ib_dma_alloc_coherent failed: rio: 0x%p\n", rio);
+		goto free_desc;
+	}
+
+	rio->dev = pcs_rdma_device_create(rio->cmid, queue_depth, send_queue_depth);
+	if (!rio->dev) {
+		TRACE("pcs_rdma_device_create failed: rio: 0x%p\n", rio);
+		goto free_bufs;
+	}
+
+	max_recv_wr = recv_queue_depth;
+	rio->max_send_wr = max_t(int, send_queue_depth * 4,
+				 DIV_ROUND_UP(send_queue_depth * (RDMA_MAX_MSG_PAYLOAD >> PAGE_SHIFT),
+					      rio->cmid->device->attrs.max_send_sge));
+
+	cq_attr.cqe = max_recv_wr + rio->max_send_wr;
+	cq_attr.comp_vector = (unsigned int)atomic_inc_return(&comp_vector) %
+		rio->cmid->device->num_comp_vectors;
+	rio->cq = RE_PTR_INV(ib_create_cq(rio->cmid->device, pcs_rdma_cq_comp_handler,
+					  pcs_rdma_cq_event_handler, rio, &cq_attr));
+	if (IS_ERR(rio->cq)) {
+		TRACE("ib_alloc_cq failed: rio: 0x%p\n", rio);
+		goto free_dev;
+	}
+	if (cq_count && cq_period) {
+		int ret = rdma_set_cq_moderation(rio->cq, cq_count, cq_period);
+		TRACE("rio: 0x%p, set cq moderation: cq_count %u, cq_period: %u, ret: %d\n",
+		      rio, cq_count, cq_period, ret);
+	}
+	ib_req_notify_cq(rio->cq, IB_CQ_NEXT_COMP);
+
+	qp_attr.event_handler = pcs_rdma_qp_event_handler;
+	qp_attr.qp_context = rio;
+	qp_attr.cap.max_send_wr = rio->max_send_wr;
+	qp_attr.cap.max_send_sge = rio->cmid->device->attrs.max_send_sge;
+	qp_attr.cap.max_recv_wr = max_recv_wr;
+	qp_attr.cap.max_recv_sge = 1;
+	qp_attr.send_cq = rio->cq;
+	qp_attr.recv_cq = rio->cq;
+	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+	qp_attr.qp_type = IB_QPT_RC;
+
+	TRACE("rio: 0x%p, max_send wr/sge: %u/%u, max_recv wr/sge: %u/%u\n",
+	      rio, qp_attr.cap.max_send_wr, qp_attr.cap.max_send_sge,
+	      qp_attr.cap.max_recv_wr, qp_attr.cap.max_recv_sge);
+	if (RE_INV(rdma_create_qp(rio->cmid, rio->dev->pd, &qp_attr))) {
+		TRACE("rdma_create_qp failed: rio: 0x%p\n", rio);
+		goto free_cq;
+	}
+
+	for (rx = rio->rx_descs; rx - rio->rx_descs < recv_queue_depth; rx++)
+		if (rio_rx_post(rio, rx, RIO_MSG_SIZE)) {
+			TRACE("rio_rx_post failed: rio: 0x%p\n", rio);
+			break;
+		}
+
+	if (rio->n_rx_posted != recv_queue_depth)
+		goto free_qp;
+
+	TRACE("rio: 0x%p, dev: 0x%p, queue_depth: %d\n", rio, rio->dev, queue_depth);
+
+	return rio;
+
+free_qp:
+	rdma_destroy_qp(rio->cmid);
+free_cq:
+	ib_destroy_cq(rio->cq);
+free_dev:
+	pcs_rdma_device_destroy(rio->dev);
+free_bufs:
+	ib_dma_free_coherent(rio->cmid->device, rio->rx_bufs_size,
+			     rio->rx_bufs, rio->rx_bufs_dma);
+free_desc:
+	kfree(rio->rx_descs);
+free_rio:
+	pcs_rpc_put(rio->netio.parent);
+	kfree(rio);
+	return NULL;
+}
+
+static void rio_cleanup(struct pcs_rdmaio *rio)
+{
+	rio_perform_tx_jobs(rio);
+
+	while (!list_empty(&rio->write_queue)) {
+		struct pcs_msg * msg = rio_dequeue_msg(rio);
+
+		pcs_msg_sent(msg);
+		pcs_set_local_error(&msg->error, rio->rio_error);
+		msg->done(msg);
+	}
+
+	while (!list_empty(&rio->reserved_queue)) {
+		struct pcs_msg * msg = rio_dequeue_reserved_msg(rio);
+
+		pcs_msg_sent(msg);
+		pcs_set_local_error(&msg->error, rio->rio_error);
+		msg->done(msg);
+	}
+
+	while (!list_empty(&rio->active_txs)) {
+		struct rio_tx *tx = list_first_entry(&rio->active_txs,
+						     struct rio_tx, list);
+		struct pcs_msg *msg = tx->msg;
+		list_del(&tx->list);
+
+		BUG_ON(!msg);
+		rio_put_tx(rio->dev, tx);
+		pcs_set_local_error(&msg->error, rio->rio_error);
+		rio_msg_sent(rio, NULL, msg, 1);
+	}
+}
+
+static void rio_abort(struct pcs_rdmaio *rio, int error)
+{
+	struct pcs_netio *netio = &rio->netio;
+	struct ib_qp_attr qp_attr = { .qp_state = IB_QPS_ERR };
+
+	if (rio->rio_state == RIO_STATE_ABORTED) /* already handled  */
+		return;
+
+	rio->rio_state = RIO_STATE_ABORTED;
+	rio->rio_error = error;
+
+	if (rdma_disconnect(rio->cmid))
+		TRACE("rdma_disconnect failed: rio: 0x%p\n", rio);
+
+	if (ib_modify_qp(rio->cmid->qp, &qp_attr, IB_QP_STATE))
+		TRACE("ib_modify_qp failed: rio: 0x%p\n", rio);
+
+	if (netio->eof) {
+		void (*eof)(struct pcs_netio *) = netio->eof;
+		netio->eof = NULL;
+		(*eof)(netio);
+	}
+}
+
+static LLIST_HEAD(rio_destroy_list);
+
+static void rio_destroy(struct work_struct *work)
+{
+	struct llist_node *list = llist_del_all(&rio_destroy_list);
+	struct pcs_rdmaio *rio, *tmp;
+
+	if (unlikely(!list))
+		return;
+
+	llist_for_each_entry_safe(rio, tmp, list, destroy_node) {
+		struct pcs_rpc *ep = rio->netio.parent;
+
+		mutex_lock(&ep->mutex);
+
+		TRACE("rio: 0x%p\n", rio);
+
+		while (rio->n_rx_posted || rio->n_tx_posted) {
+			rio_req_notify_cq(rio);
+			wait_event_timeout(rio->waitq, rio_comp_perform(rio),
+					   ep->params.response_timeout);
+		}
+		rio_cleanup(rio);
+
+		rdma_destroy_qp(rio->cmid);
+		ib_destroy_cq(rio->cq);
+
+		BUG_ON(!list_empty(&rio->tx_jobs));
+		BUG_ON(!list_empty(&rio->write_queue));
+		BUG_ON(!list_empty(&rio->reserved_queue));
+		BUG_ON(!list_empty(&rio->active_txs));
+		BUG_ON(rio->dev->free_txs_cnt != rio->send_queue_depth);
+
+		pcs_rdma_device_destroy(rio->dev);
+		ib_dma_free_coherent(rio->cmid->device, rio->rx_bufs_size,
+				     rio->rx_bufs, rio->rx_bufs_dma);
+		kfree(rio->rx_descs);
+
+		rdma_destroy_id(rio->cmid);
+
+		memset(rio, 0xFF, sizeof(*rio));
+		kfree(rio);
+
+		mutex_unlock(&ep->mutex);
+
+		pcs_rpc_put(ep);
+	}
+}
+
+static DECLARE_WORK(rio_destroy_work, rio_destroy);
+
+void pcs_rdma_destroy(struct pcs_rdmaio *rio)
+{
+	struct pcs_netio *netio = &rio->netio;
+	struct pcs_rpc *ep = netio->parent;
+
+	TRACE("rio: 0x%p\n", rio);
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	netio->eof = NULL;
+	rio_abort(rio, PCS_ERR_NET_ABORT);
+
+	if (llist_add(&rio->destroy_node, &rio_destroy_list))
+		queue_work(pcs_cleanup_wq, &rio_destroy_work);
+}
+
+void pcs_rdma_ioconn_destruct(struct pcs_ioconn *ioconn)
+{
+	struct pcs_rdmaio *rio = rio_from_ioconn(ioconn);
+	struct pcs_rpc *ep = rio->netio.parent;
+
+	TRACE("rio: 0x%p\n", rio);
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	BUG_ON(rio->rio_state != RIO_STATE_ABORTED);
+	if (llist_add(&rio->destroy_node, &rio_destroy_list))
+		queue_work(pcs_cleanup_wq, &rio_destroy_work);
+}
+
+static void pcs_rdma_throttle(struct pcs_netio *netio)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+
+	TRACE("rio: 0x%p\n", rio);
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (rio->throttled || rio->rio_state != RIO_STATE_ESTABLISHED)
+		return;
+
+	rio->throttled = 1;
+}
+
+static void pcs_rdma_unthrottle(struct pcs_netio *netio)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+
+	TRACE("rio: 0x%p\n", rio);
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (!rio->throttled || rio->rio_state != RIO_STATE_ESTABLISHED)
+		return;
+
+	rio->throttled = 0;
+
+	if (!list_empty(&rio->pended_rxs))
+		rio_handle_pended_rxs(rio);
+}
+
+static void pcs_rdma_send_msg(struct pcs_netio *netio, struct pcs_msg *msg)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (rio->rio_state != RIO_STATE_ESTABLISHED) {
+		pcs_msg_sent(msg);
+		pcs_set_local_error(&msg->error, rio->rio_error);
+		msg->done(msg);
+		return;
+	}
+
+	msg->netio = netio;
+
+	list_add_tail(&msg->list, &rio->write_queue);
+	msg->start_time = jiffies;
+	msg->stage = PCS_MSG_STAGE_SEND;
+
+	rio_kick_write_queue(rio);
+}
+
+static int pcs_rdma_cancel_msg(struct pcs_msg *msg)
+{
+	struct pcs_rpc *ep = msg->netio->parent;
+
+	TRACE("msg: 0x%p\n", msg);
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (list_empty(&msg->list))
+		return -EBUSY;
+
+	list_del_init(&msg->list);
+	msg->stage = PCS_MSG_STAGE_SENT;
+
+	return 0;
+}
+
+static void pcs_rdma_abort_io(struct pcs_netio *netio, int error)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+
+	TRACE("rio: 0x%p\n", rio);
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (rio->rio_state != RIO_STATE_ESTABLISHED)
+		return;
+
+	netio->eof = NULL;
+	rio_abort(rio, error);
+}
+
+static void pcs_rdma_xmit(struct pcs_netio *netio)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (rio->rio_state != RIO_STATE_ESTABLISHED)
+		return;
+
+	if (test_bit(PCS_RDMA_IO_ERROR, &rio->io_flags))
+		rio_abort(rio, PCS_ERR_NET_ABORT);
+
+	while (test_and_clear_bit(PCS_RDMA_IO_CQE, &rio->io_flags)) {
+		do {
+			rio_comp_perform(rio);
+		} while (rio_req_notify_cq(rio) > 0);
+	}
+
+	rio_kick_write_queue(rio);
+}
+
+static int pcs_rdma_flush(struct pcs_netio *netio)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (rio->rio_state != RIO_STATE_ESTABLISHED)
+		return 0;
+
+	return test_bit(PCS_RDMA_IO_CQE, &rio->io_flags);
+}
+
+static unsigned long pcs_rdma_next_timeout(struct pcs_netio *netio)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+	struct pcs_msg *msg;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (list_empty(&rio->write_queue))
+		return 0;
+
+	msg = list_first_entry(&rio->write_queue, struct pcs_msg, list);
+	return msg->start_time + rio->send_timeout;
+}
+
+static int pcs_rdma_sync_send(struct pcs_netio *netio, struct pcs_msg *msg)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+	int credits = rio->n_os_credits;
+	struct pcs_remote_buf *rb;
+	struct pcs_rdma_ack *rack;
+	struct rio_tx *tx;
+	struct rio_cqe *cqe;
+	int hdr_len;
+	int ret;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (rio->rio_state != RIO_STATE_ESTABLISHED ||
+	    !rio->n_peer_credits)
+		return -EINVAL;
+
+	tx = RE_NULL(rio_get_tx(rio->dev));
+	if (!tx)
+		return -ENOMEM;
+
+	hdr_len = rio_init_msg(tx->buf, msg->size, credits, SUBMIT_REGULAR, &rb, &rack);
+	if (hdr_len + msg->size > RDMA_THRESHOLD) {
+		rio_put_tx(rio->dev, tx);
+		return -EINVAL;
+	}
+
+	memcpy(tx->buf + hdr_len, msg->_inline_buffer, msg->size);
+	rio_update_msg_immediate(tx->buf, msg->size);
+
+	ret = rio_tx_post_send(rio, tx, hdr_len + msg->size,
+			       NULL, NULL);
+	if (ret) {
+		rio_put_tx(rio->dev, tx);
+		return ret;
+	}
+	rio->n_os_credits -= credits;
+	rio->n_peer_credits--;
+
+	wait_event_timeout(rio->waitq, (cqe = rio_poll_cq(rio)),
+			   ep->params.connect_timeout);
+	if (rio_req_notify_cq(rio) > 0)
+		pcs_rpc_kick_queue(ep);
+
+	if (!cqe)
+		return -ETIMEDOUT;
+
+	ret = cqe == &tx->cqe && cqe->status == IB_WC_SUCCESS ?
+		0 : -EFAULT;
+	cqe->done(cqe, true);
+
+	return ret;
+}
+
+static int pcs_rdma_sync_recv(struct pcs_netio *netio, struct pcs_msg **msg)
+{
+	struct pcs_rdmaio *rio = rio_from_netio(netio);
+	struct pcs_rpc *ep = netio->parent;
+	struct rio_rx *rx;
+	struct rio_cqe *cqe;
+	char *buf;
+	int   type;
+	char *payload;
+	int payload_size;
+	int credits;
+	struct pcs_remote_buf *rb = NULL;
+	struct pcs_rdma_ack *rack;
+	int ret = 0;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	if (rio->rio_state != RIO_STATE_ESTABLISHED)
+		return -EINVAL;
+
+	wait_event_timeout(rio->waitq, (cqe = rio_poll_cq(rio)),
+			   ep->params.connect_timeout);
+	if (rio_req_notify_cq(rio) > 0)
+		pcs_rpc_kick_queue(ep);
+
+	if (!cqe)
+		return -ETIMEDOUT;
+
+	if (cqe->done != rio_rx_done || cqe->status != IB_WC_SUCCESS) {
+		ret = -EFAULT;
+		goto out;
+	}
+
+	rx = container_of(cqe, struct rio_rx, cqe);
+
+	buf = rx2buf(rio, rx);
+	type = rio_parse_hdr(buf, &payload, &payload_size, &credits, &rb, &rack,
+			     rio->queue_depth);
+	if (type != RIO_MSG_IMMEDIATE || rb) {
+		ret = -EFAULT;
+		goto out;
+	}
+	rio->n_peer_credits += credits;
+
+	*msg = pcs_rpc_alloc_output_msg(payload_size);
+	if (!*msg) {
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	memcpy((*msg)->_inline_buffer, payload, payload_size);
+
+out:
+	cqe->done(cqe, true);
+	return ret;
+}
+
+struct pcs_netio_tops pcs_rdma_netio_tops = {
+	.throttle		= pcs_rdma_throttle,
+	.unthrottle		= pcs_rdma_unthrottle,
+	.send_msg		= pcs_rdma_send_msg,
+	.cancel_msg		= pcs_rdma_cancel_msg,
+	.abort_io		= pcs_rdma_abort_io,
+	.xmit			= pcs_rdma_xmit,
+	.flush			= pcs_rdma_flush,
+	.next_timeout		= pcs_rdma_next_timeout,
+	.sync_send		= pcs_rdma_sync_send,
+	.sync_recv		= pcs_rdma_sync_recv,
+};
diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.h b/fs/fuse/kio/pcs/pcs_rdma_io.h
new file mode 100644
index 000000000000..912be17e574d
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_io.h
@@ -0,0 +1,114 @@
+#ifndef _PCS_RMDA_IO_H_
+#define _PCS_RMDA_IO_H_ 1
+
+#include <linux/types.h>
+#include <linux/wait.h>
+#include <linux/workqueue.h>
+#include <linux/llist.h>
+
+#include <rdma/rdma_cm.h>
+
+#include "pcs_types.h"
+#include "pcs_rpc.h"
+#include "pcs_sock_io.h"
+#include "pcs_error.h"
+#include "pcs_net.h"
+#include "pcs_rdma_prot.h"
+#include "log.h"
+
+#define RIO_IB_WC_MAX 64
+
+enum {
+	RIO_STATE_CONNECTING,   /* needn't rdma_disconnect (yet) */
+	RIO_STATE_ESTABLISHED,  /* main "working" state */
+	RIO_STATE_ABORTED,      /* rio_abort was called at least once */
+};
+
+extern struct pcs_netio_tops pcs_rdma_netio_tops;
+
+struct pcs_rdma_id
+{
+	int (*event_handler)(struct rdma_cm_id *cmid, struct rdma_cm_event *event);
+};
+
+struct pcs_rdmaio
+{
+	/*
+	 * That's not very obvious, we need two poll-able objects: netio.iocomp
+	 * and compc. The former handles DISCONNECT event. The latter (compc)
+	 * handles WQE completion events. */
+	struct pcs_netio netio;
+	struct pcs_rdma_id id;
+
+	struct llist_node destroy_node;
+
+	wait_queue_head_t waitq;
+	unsigned long io_flags; /* atomic bit ops */
+
+	int rio_state; /* see enum above */
+	int rio_error;
+
+	int hdr_size;  /* minimum allowed payload */
+
+	/*
+	 * It's easier to have the same queue_depth for both directions.
+	 * rdma_connect gets a value from a tunable and sends it via
+	 * conn_param; rdma_listen sees it in conn request event and
+	 * blindly accepts the value. */
+	int queue_depth;
+	int send_queue_depth;
+	int max_send_wr;
+
+	int send_timeout;
+
+	struct ib_wc wcs[RIO_IB_WC_MAX];
+	int wc_cnt;
+	int wc_idx;
+
+	struct list_head tx_jobs; /* list head of TX jobs */
+
+	struct rio_rx *rx_descs; /* plain array of RX descriptors */
+	char *rx_bufs;           /* MR-ed area for payload of RXs */
+	size_t rx_bufs_size;
+	dma_addr_t rx_bufs_dma;
+	struct list_head pended_rxs; /* list head of pended RX frames */
+
+	int n_rx_posted; /* # posted RXs */
+	int n_tx_posted; /* # posted TXs */
+
+	int n_peer_credits; /* what we think about peer's n_rx_posted */
+	int n_reserved_credits; /* limits # RDMA in flight */
+
+	int n_os_credits;   /* outstanding credits: # RXs we re-post-ed,
+			     * but have not returned to our peer (yet) */
+
+	int n_th_credits;   /* threshold: when to return outstanding
+			     * credits urgently */
+
+	struct pcs_rdma_device *dev;
+	struct rdma_cm_id *cmid;
+	struct ib_cq *cq;
+
+	struct list_head write_queue;
+	struct list_head reserved_queue; /* out of reserved credits */
+
+	int no_kick;   /* do not kick processing write_queue */
+	int throttled; /* pcs_rpc asked us to quiesce */
+
+	struct list_head active_txs; /* list head of active TX frames: tx->msg->done()
+				      * is postponed until ACK from our peer */
+
+	u64 xid_generator; /* provides unique (per rio) xids */
+
+	struct pcs_rdmaio_conn_req conn_req;
+};
+
+#define rio_from_netio(nio) container_of(nio, struct pcs_rdmaio, netio)
+#define rio_from_ioconn(conn) container_of(conn, struct pcs_rdmaio, netio.ioconn)
+
+struct pcs_rdmaio* pcs_rdma_create(int hdr_size, struct rdma_cm_id *cmid,
+		int queue_depth, struct pcs_rpc *ep);
+void pcs_rdma_destroy(struct pcs_rdmaio *rio);
+void pcs_rdma_ioconn_destruct(struct pcs_ioconn *ioconn);
+
+#endif /* _PCS_RMDA_IO_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rdma_prot.h b/fs/fuse/kio/pcs/pcs_rdma_prot.h
new file mode 100644
index 000000000000..4a311cdb6f73
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_prot.h
@@ -0,0 +1,119 @@
+#ifndef _PCS_RDMA_PROT_H_
+#define _PCS_RDMA_PROT_H_ 1
+
+/* PCS RDMA network protocol v1 */
+
+#define RIO_MAGIC 0x5078614d
+#define RIO_VERSION 1
+
+#define RIO_MSG_SIZE	   (2*4096) /* max size of any wire message */
+
+#define RIO_QUEUE_DEPTH     8   /* TODO: make it tunable */
+#define RIO_MAX_QUEUE_DEPTH 128 /* for conn_param sanity checks */
+
+/* negotiated by rdma_connect/rdma_accept */
+struct pcs_rdmaio_conn_req {
+	u32 magic;       /* RIO_MAGIC */
+	u32 version;     /* RIO_VERSION */
+	u32 queue_depth; /* RIO_QUEUE_DEPTH */
+	u32 msg_size;    /* RIO_MSG_SIZE */
+} __attribute__((aligned(8)));
+
+/* negotiated by rdma_connect/rdma_accept */
+struct pcs_rdmaio_rej {
+	struct pcs_rdmaio_conn_req cr;
+	u32 error;       /* errno */
+} __attribute__((aligned(8)));
+
+/* "type" field of pcs_rdmaio_msg */
+enum {
+	RIO_MSG_IMMEDIATE = 0,
+	RIO_MSG_NOOP,
+	RIO_MSG_RDMA_READ_REQ,
+	RIO_MSG_RDMA_READ_ACK,
+	RIO_MAX_TYPE_VALUE
+};
+
+/* sent/recieved by rdma_post_send/rdma_post_recv */
+struct pcs_rdmaio_hdr {
+	u32 magic;       /* RIO_MAGIC */
+	u16 version;     /* RIO_VERSION */
+	u16 type;        /* RIO_MSG_IMMEDIATE/... */
+	u32 size;        /* total size of wire message */
+	u32 credits;     /* # credits to return */
+} __attribute__((aligned(8)));
+
+struct pcs_remote_buf {
+	u64 xid;
+	u64 rbuf;
+	u32 rkey;
+	u32 rlen;
+} __attribute__((aligned(8)));
+
+struct pcs_rdma_ack {
+	u64 xid;
+	u32 status;
+} __attribute__((aligned(8)));
+
+static inline int rio_parse_hdr(char *buf, char **payload, int *payload_size,
+				int *credits, struct pcs_remote_buf **rb,
+				struct pcs_rdma_ack **rack,
+				int queue_depth)
+{
+	struct pcs_rdmaio_hdr *hdr = (struct pcs_rdmaio_hdr *)buf;
+
+	if (hdr->magic != RIO_MAGIC) {
+		TRACE("wrong rio msg magic: 0x%x\n", hdr->magic);
+		return -1;
+	}
+
+	if (hdr->version != RIO_VERSION) {
+		TRACE("wrong rio msg version: 0x%x\n", hdr->version);
+		return -1;
+	}
+
+	if (hdr->type >= RIO_MAX_TYPE_VALUE) {
+		TRACE("wrong rio msg type: 0x%x\n", hdr->type);
+		return -1;
+	}
+
+	if (hdr->size > RIO_MSG_SIZE) {
+		TRACE("wrong rio msg size: 0x%x\n", hdr->size);
+		return -1;
+	}
+
+	if (hdr->credits > queue_depth) {
+		TRACE("wrong rio msg credits: 0x%x\n", hdr->credits);
+		return -1;
+	}
+
+	if (hdr->type == RIO_MSG_RDMA_READ_REQ &&
+	    hdr->size - sizeof(*hdr) < sizeof(struct pcs_remote_buf)) {
+		TRACE("short rdma read req: 0x%x\n", hdr->size);
+		return -1;
+	}
+
+	if (hdr->type == RIO_MSG_RDMA_READ_ACK &&
+	    hdr->size != sizeof(*hdr) + sizeof(struct pcs_rdma_ack)) {
+		TRACE("wrong size rdma read ack: 0x%x\n", hdr->size);
+		return -1;
+	}
+
+	*payload = buf + sizeof(*hdr);
+	*payload_size = hdr->size - sizeof(*hdr);
+	*credits = hdr->credits;
+
+	if (hdr->type == RIO_MSG_RDMA_READ_REQ) {
+		*rb = (struct pcs_remote_buf *)*payload;
+		*payload += sizeof(struct pcs_remote_buf);
+		*payload_size -= sizeof(struct pcs_remote_buf);
+	} else if (hdr->type == RIO_MSG_RDMA_READ_ACK) {
+		*rack = (struct pcs_rdma_ack *)*payload;
+		*payload += sizeof(struct pcs_rdma_ack);
+		*payload_size -= sizeof(struct pcs_rdma_ack);
+	}
+
+	return hdr->type;
+}
+
+#endif /* _PCS_RDMA_PROT_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rdma_rw.c b/fs/fuse/kio/pcs/pcs_rdma_rw.c
new file mode 100644
index 000000000000..15659083b889
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_rw.c
@@ -0,0 +1,815 @@
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+
+#include "pcs_rdma_rw.h"
+
+static int dma_dir_to_ib_reg_access(enum dma_data_direction dir)
+{
+	switch (dir) {
+		case DMA_BIDIRECTIONAL:
+			return IB_ACCESS_REMOTE_READ |
+			       IB_ACCESS_REMOTE_WRITE |
+			       IB_ACCESS_LOCAL_WRITE;
+		case DMA_TO_DEVICE:
+			return IB_ACCESS_REMOTE_READ;
+		case DMA_FROM_DEVICE:
+			return IB_ACCESS_REMOTE_WRITE |
+			       IB_ACCESS_LOCAL_WRITE;
+		default:
+			return 0;
+	};
+}
+
+static unsigned int dma_dir_to_it_dir(enum dma_data_direction dir)
+{
+	return DMA_TO_DEVICE ? WRITE : READ;
+}
+
+int pcs_sgl_buf_init_from_msg(struct pcs_sgl_buf *sbuf, unsigned int dir, struct pcs_msg *msg,
+			      size_t offset, size_t end_offset, gfp_t gfp, bool allow_gaps)
+{
+	struct iov_iter it;
+	size_t msg_offset;
+	struct scatterlist *sg;
+	int ret, i;
+
+	if (offset >= end_offset || end_offset > msg->size)
+		return -EINVAL;
+
+	sbuf->pg_cnt = 0;
+	sbuf->sg_cnt = 0;
+	sbuf->page_clean = put_page;
+
+	iov_iter_kvec(&it, dir, NULL, 0, 0);
+	msg_offset = offset;
+	while (msg_offset < end_offset) {
+		size_t len;
+
+		if (!iov_iter_count(&it))
+			msg->get_iter(msg, msg_offset, &it, dir);
+
+		len = iov_iter_single_seg_count(&it);
+		if (len > end_offset - msg_offset)
+			len = end_offset - msg_offset;
+
+		iov_iter_advance(&it, len);
+		msg_offset += len;
+		sbuf->sg_cnt++;
+	}
+
+	sbuf->pages = RE_NULL(kzalloc(sizeof(struct page*) * sbuf->sg_cnt,
+				      gfp));
+	if (!sbuf->pages)
+		return -ENOMEM;
+
+	sbuf->sgl = RE_NULL(kzalloc(sizeof(struct scatterlist) * sbuf->sg_cnt,
+				    gfp));
+	if (!sbuf->sgl) {
+		ret = -ENOMEM;
+		goto fail_pg;
+	}
+	sg_init_table(sbuf->sgl, sbuf->sg_cnt);
+
+	iov_iter_kvec(&it, dir, NULL, 0, 0);
+	msg_offset = offset;
+	sg = sbuf->sgl;
+	while (msg_offset < end_offset) {
+		size_t off, len;
+
+		if (!iov_iter_count(&it))
+			msg->get_iter(msg, msg_offset, &it, dir);
+
+		len = iov_iter_get_pages(&it, sbuf->pages + sbuf->pg_cnt,
+					 PAGE_SIZE, 1, &off);
+		if (len <= 0) {
+			ret = -EINVAL;
+			goto fail_sgl;
+		}
+
+		if (len > end_offset - msg_offset)
+			len = end_offset - msg_offset;
+
+		sg_set_page(sg, sbuf->pages[sbuf->pg_cnt++], len, off);
+
+		if (!allow_gaps &&
+		    ((msg_offset != offset && !IS_ALIGNED(off, PAGE_SIZE)) ||
+		     ((msg_offset + len != end_offset) && !IS_ALIGNED(off + len, PAGE_SIZE)))) {
+			ret = -EINVAL;
+			goto fail_sgl;
+		}
+
+		sg = sg_next(sg);
+		iov_iter_advance(&it, len);
+		msg_offset += len;
+	}
+
+	return 0;
+
+fail_sgl:
+	kfree(sbuf->sgl);
+fail_pg:
+	for (i = 0; i < sbuf->pg_cnt; i++)
+		sbuf->page_clean(sbuf->pages[i]);
+	kfree(sbuf->pages);
+
+	return ret;
+}
+
+static void pcs_sgl_buf_free_page(struct page *page)
+{
+	__free_page(page);
+}
+
+int pcs_sgl_buf_init(struct pcs_sgl_buf *sbuf, size_t size, gfp_t gfp)
+{
+	struct scatterlist *sg;
+	int ret, i;
+
+	if (size == 0)
+		return -EINVAL;
+
+	sbuf->pg_cnt = 0;
+	sbuf->sg_cnt = PAGE_ALIGN(size) >> PAGE_SHIFT;
+	sbuf->page_clean = pcs_sgl_buf_free_page;
+
+	sbuf->pages = RE_NULL(kzalloc(sizeof(struct page*) * sbuf->sg_cnt,
+				      gfp));
+	if (!sbuf->pages)
+		return -ENOMEM;
+
+	sbuf->sgl = RE_NULL(kzalloc(sizeof(struct scatterlist) * sbuf->sg_cnt,
+				    gfp));
+	if (!sbuf->sgl) {
+		ret = -ENOMEM;
+		goto fail_pg;
+	}
+	sg_init_table(sbuf->sgl, sbuf->sg_cnt);
+
+	for_each_sg(sbuf->sgl, sg, sbuf->sg_cnt, i) {
+		size_t sg_len = min_t(size_t, size, PAGE_SIZE);
+		sbuf->pages[sbuf->pg_cnt] = RE_NULL(alloc_page(gfp));
+		if (!sbuf->pages[sbuf->pg_cnt]) {
+			ret = -ENOMEM;
+			goto fail_sgl;
+		}
+		BUG_ON(!sg_len);
+		sg_set_page(sg, sbuf->pages[sbuf->pg_cnt], sg_len, 0);
+		size -= sg_len;
+		sbuf->pg_cnt++;
+	}
+
+	return 0;
+
+fail_sgl:
+	kfree(sbuf->sgl);
+fail_pg:
+	for (i = 0; i < sbuf->pg_cnt; i++)
+		sbuf->page_clean(sbuf->pages[i]);
+	kfree(sbuf->pages);
+
+	return ret;
+}
+
+void pcs_sgl_buf_destroy(struct pcs_sgl_buf *sbuf)
+{
+	int i;
+
+	kfree(sbuf->sgl);
+
+	for (i = 0; i < sbuf->pg_cnt; i++)
+		sbuf->page_clean(sbuf->pages[i]);
+	kfree(sbuf->pages);
+}
+
+int pcs_ib_mr_pool_init(struct pcs_ib_mr_pool *pool, struct ib_pd *pd,
+			enum ib_mr_type mr_type, u32 max_num_sg, size_t mr_cnt)
+{
+	struct ib_mr *mr;
+	int ret, i;
+
+	pool->pd = pd;
+	pool->mr_type = mr_type;
+	pool->max_num_sg = max_num_sg;
+	pool->mr_cnt = mr_cnt;
+
+	spin_lock_init(&pool->lock);
+
+	INIT_LIST_HEAD(&pool->mr_list);
+	pool->used_mrs = 0;
+
+	for (i = 0; i < mr_cnt; i++) {
+		mr = RE_PTR_INV(ib_alloc_mr(pd, mr_type, max_num_sg));
+		if (IS_ERR(mr)) {
+			ret = PTR_ERR(mr);
+			goto fail;
+		}
+		list_add_tail(&mr->qp_entry, &pool->mr_list);
+	}
+
+	return 0;
+
+fail:
+	pcs_ib_mr_pool_destroy(pool);
+	return ret;
+}
+
+void pcs_ib_mr_pool_destroy(struct pcs_ib_mr_pool *pool)
+{
+	struct ib_mr *mr, *tmp;
+
+	spin_lock_irq(&pool->lock);
+	BUG_ON(pool->used_mrs);
+	spin_unlock_irq(&pool->lock);
+
+	list_for_each_entry_safe(mr, tmp, &pool->mr_list, qp_entry)
+		ib_dereg_mr(mr);
+}
+
+struct ib_mr* pcs_ib_mr_pool_get(struct pcs_ib_mr_pool *pool)
+{
+	struct ib_mr *mr;
+	unsigned long flags;
+
+	spin_lock_irqsave(&pool->lock, flags);
+	mr = list_first_entry_or_null(&pool->mr_list, struct ib_mr, qp_entry);
+	if (mr) {
+		list_del(&mr->qp_entry);
+		BUG_ON(pool->used_mrs >= pool->mr_cnt);
+		pool->used_mrs++;
+	}
+	spin_unlock_irqrestore(&pool->lock, flags);
+
+	if (!mr && !in_interrupt()) {
+		mr = RE_PTR_INV(ib_alloc_mr(pool->pd, pool->mr_type,
+					    pool->max_num_sg));
+		if (IS_ERR(mr))
+			return NULL;
+
+		spin_lock_irqsave(&pool->lock, flags);
+		pool->mr_cnt++;
+		BUG_ON(pool->used_mrs >= pool->mr_cnt);
+		pool->used_mrs++;
+		spin_unlock_irqrestore(&pool->lock, flags);
+	}
+
+	return mr;
+}
+
+void pcs_ib_mr_pool_put(struct pcs_ib_mr_pool *pool, struct ib_mr *mr)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&pool->lock, flags);
+	list_add(&mr->qp_entry, &pool->mr_list);
+	BUG_ON(!pool->used_mrs);
+	pool->used_mrs--;
+	spin_unlock_irqrestore(&pool->lock, flags);
+}
+
+int pcs_rdma_rw_init_from_msg(struct pcs_rdma_rw *rw, struct ib_device *dev,
+			      enum dma_data_direction dir, u64 remote_addr, u32 rkey,
+			      u32 local_dma_lkey, struct pcs_msg *msg, size_t offset,
+			      size_t end_offset, gfp_t gfp, size_t sge_per_wr)
+{
+	struct scatterlist *sg;
+	struct ib_sge *sge;
+	size_t sge_cnt;
+	int ret, i, k;
+
+	if (dir != DMA_TO_DEVICE && dir != DMA_FROM_DEVICE)
+		return -EINVAL;
+
+	rw->dev = dev;
+	rw->dir = dir;
+
+	ret = pcs_sgl_buf_init_from_msg(&rw->sbuf, dma_dir_to_it_dir(dir), msg, offset, end_offset,
+					gfp, true);
+	if (ret)
+		return ret;
+
+	ret = RE_INV(ib_dma_map_sg(dev, rw->sbuf.sgl, rw->sbuf.sg_cnt,
+				   dir));
+	if (ret <= 0) {
+		ret = ret < 0 ? ret : -EIO;
+		goto fail_sgl;
+	}
+	rw->sbuf.sg_cnt = ret;
+
+	rw->nr_wrs = DIV_ROUND_UP(rw->sbuf.sg_cnt, sge_per_wr);
+
+	rw->sges = RE_NULL(kzalloc(sizeof(struct ib_sge) * rw->sbuf.sg_cnt, gfp));
+	if (!rw->sges) {
+		ret = -ENOMEM;
+		goto fail_dma;
+	}
+
+	rw->wrs = RE_NULL(kzalloc(sizeof(struct ib_rdma_wr) * rw->nr_wrs, gfp));
+	if (!rw->wrs) {
+		ret = -ENOMEM;
+		goto fail_sges;
+	}
+
+	sg = rw->sbuf.sgl;
+	sge = rw->sges;
+	sge_cnt = rw->sbuf.sg_cnt;
+	for (i = 0; i < rw->nr_wrs; i++) {
+		struct ib_rdma_wr *wr = rw->wrs + i;
+		int num_sge = min_t(size_t, sge_cnt, sge_per_wr);
+
+		wr->wr.opcode = dir == DMA_FROM_DEVICE ? IB_WR_RDMA_READ :
+			IB_WR_RDMA_WRITE;
+		wr->wr.sg_list = sge;
+		wr->wr.num_sge = num_sge;
+		wr->remote_addr = remote_addr;
+		wr->rkey = rkey;
+
+		for (k = 0; k < num_sge; k++, sg = sg_next(sg)) {
+			sge->addr = sg_dma_address(sg);
+			sge->length = sg_dma_len(sg);
+			sge->lkey = local_dma_lkey;
+
+			remote_addr += sge->length;
+			sge++;
+			sge_cnt--;
+		}
+
+		if (i > 0)
+			rw->wrs[i - 1].wr.next = &wr->wr;
+	}
+
+	return 0;
+
+fail_sges:
+	kfree(rw->sges);
+fail_dma:
+	ib_dma_unmap_sg(dev, rw->sbuf.sgl, rw->sbuf.sg_cnt, dir);
+fail_sgl:
+	pcs_sgl_buf_destroy(&rw->sbuf);
+
+	return ret;
+}
+
+void pcs_rdma_rw_destroy(struct pcs_rdma_rw *rw)
+{
+	kfree(rw->wrs);
+	kfree(rw->sges);
+	ib_dma_unmap_sg(rw->dev, rw->sbuf.sgl, rw->sbuf.sg_cnt, rw->dir);
+	pcs_sgl_buf_destroy(&rw->sbuf);
+}
+
+static int pcs_rdma_mr_init_common(struct pcs_rdma_mr *mr, struct ib_device *dev,
+				   struct ib_pd *pd, enum dma_data_direction dir, size_t size,
+				   struct pcs_ib_mr_pool *ib_mr_pool)
+{
+	unsigned long dma_align = dma_get_cache_alignment();
+	struct scatterlist *sg;
+	int ret, i;
+
+	/* For testing fallback */
+	RE_SET(ib_mr_pool, NULL);
+
+	/* Only cache aligned DMA transfers are reliable */
+	for_each_sg(mr->sbuf.sgl, sg, mr->sbuf.sg_cnt, i)
+		if (!IS_ALIGNED((uintptr_t)sg_virt(sg), dma_align) ||
+				!IS_ALIGNED((uintptr_t)(sg_virt(sg) + sg_dma_len(sg)),
+					    dma_align))
+			return -EINVAL;
+
+	INIT_LIST_HEAD(&mr->entry);
+	mr->dev = dev;
+	mr->pd = pd;
+	mr->dir = dir;
+	mr->size = size;
+	mr->ib_mr_pool = ib_mr_pool && ib_mr_pool->mr_type == IB_MR_TYPE_MEM_REG &&
+		ib_mr_pool->max_num_sg >= mr->sbuf.sg_cnt ? ib_mr_pool : NULL;
+
+	ret = RE_INV(ib_dma_map_sg(dev, mr->sbuf.sgl, mr->sbuf.sg_cnt,
+				   dir));
+	if (ret <= 0)
+		return ret < 0 ? ret : -EIO;
+	mr->sbuf.sg_cnt = ret;
+
+	if (mr->ib_mr_pool) {
+		mr->mr = RE_NULL(pcs_ib_mr_pool_get(mr->ib_mr_pool));
+		if (!mr->mr) {
+			ret = -ENOMEM;
+			goto fail_dma;
+		}
+	} else { /* fallback */
+		mr->mr = RE_PTR_INV(ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG,
+						mr->sbuf.sg_cnt));
+		if (IS_ERR(mr->mr)) {
+			ret = PTR_ERR(mr->mr);
+			goto fail_dma;
+		}
+	}
+
+	ret = RE_INV(ib_map_mr_sg(mr->mr, mr->sbuf.sgl, mr->sbuf.sg_cnt,
+				  NULL, PAGE_SIZE));
+	if (ret != mr->sbuf.sg_cnt) {
+		ret = ret < 0 ? ret : -EIO;
+		goto fail_mr;
+	}
+
+	memset(&mr->inv_wr, 0, sizeof(mr->inv_wr));
+	mr->inv_wr.next = &mr->reg_wr.wr;
+	mr->inv_wr.opcode = IB_WR_LOCAL_INV;
+	mr->inv_wr.ex.invalidate_rkey = mr->mr->lkey;
+
+	ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->lkey));
+
+	memset(&mr->reg_wr, 0, sizeof(mr->reg_wr));
+	mr->reg_wr.wr.opcode = IB_WR_REG_MR;
+	mr->reg_wr.mr = mr->mr;
+	mr->reg_wr.key = mr->mr->lkey;
+	mr->reg_wr.access = dma_dir_to_ib_reg_access(dir);
+
+	mr->first_wr = mr->mr->need_inval ? &mr->inv_wr: &mr->reg_wr.wr;
+	mr->last_wr = &mr->reg_wr.wr;
+	mr->mr->need_inval = true;
+
+	return 0;
+
+fail_mr:
+	if (mr->ib_mr_pool)
+		pcs_ib_mr_pool_put(mr->ib_mr_pool, mr->mr);
+	else /* fallback */
+		ib_dereg_mr(mr->mr);
+fail_dma:
+	ib_dma_unmap_sg(dev, mr->sbuf.sgl, mr->sbuf.sg_cnt, dir);
+
+	return ret;
+}
+
+int pcs_rdma_mr_init_from_msg(struct pcs_rdma_mr *mr, struct ib_device *dev,
+			      struct ib_pd *pd, enum dma_data_direction dir, struct pcs_msg *msg,
+			      size_t offset, size_t end_offset, gfp_t gfp,
+			      struct pcs_ib_mr_pool *ib_mr_pool)
+{
+	int ret;
+
+	ret = pcs_sgl_buf_init_from_msg(&mr->sbuf, dma_dir_to_it_dir(dir), msg, offset, end_offset,
+					gfp, false);
+	if (ret)
+		return ret;
+
+	ret = pcs_rdma_mr_init_common(mr, dev, pd, dir, end_offset - offset,
+				      ib_mr_pool);
+	if (ret)
+		pcs_sgl_buf_destroy(&mr->sbuf);
+
+	return ret;
+}
+
+int pcs_rdma_mr_init(struct pcs_rdma_mr *mr, struct ib_device *dev, struct ib_pd *pd,
+		     enum dma_data_direction dir, size_t size, gfp_t gfp,
+		     struct pcs_ib_mr_pool *ib_mr_pool)
+{
+	int ret;
+
+	ret = pcs_sgl_buf_init(&mr->sbuf, size, gfp);
+	if (ret)
+		return ret;
+
+	ret = pcs_rdma_mr_init_common(mr, dev, pd, dir, size, ib_mr_pool);
+	if (ret)
+		pcs_sgl_buf_destroy(&mr->sbuf);
+
+	return ret;
+}
+
+void pcs_rdma_mr_destroy(struct pcs_rdma_mr *mr)
+{
+	if (mr->ib_mr_pool)
+		pcs_ib_mr_pool_put(mr->ib_mr_pool, mr->mr);
+	else /* fallback */
+		ib_dereg_mr(mr->mr);
+	ib_dma_unmap_sg(mr->dev, mr->sbuf.sgl, mr->sbuf.sg_cnt, mr->dir);
+	pcs_sgl_buf_destroy(&mr->sbuf);
+}
+
+struct pcs_rdma_mr* pcs_rdma_mr_alloc(struct ib_device *dev, struct ib_pd *pd,
+				      enum dma_data_direction dir, size_t size, gfp_t gfp,
+				      struct pcs_ib_mr_pool *ib_mr_pool)
+{
+	struct pcs_rdma_mr *mr;
+	int ret;
+
+	mr = RE_NULL(kzalloc(sizeof(*mr), gfp));
+	if (!mr)
+		return ERR_PTR(-ENOMEM);
+
+	ret = pcs_rdma_mr_init(mr, dev, pd, dir, size, gfp, ib_mr_pool);
+	if (ret) {
+		kfree(mr);
+		return ERR_PTR(ret);
+	}
+
+	return mr;
+}
+
+void pcs_rdma_mr_free(struct pcs_rdma_mr *mr)
+{
+	pcs_rdma_mr_destroy(mr);
+	kfree(mr);
+}
+
+void pcs_rdma_mr_sync_for_cpu(struct pcs_rdma_mr *mr, size_t size)
+{
+	struct scatterlist *sg;
+	unsigned int i;
+
+	for_each_sg(mr->sbuf.sgl, sg, mr->sbuf.sg_cnt, i) {
+		size_t sg_len = min_t(size_t, size, sg_dma_len(sg));
+		ib_dma_sync_single_for_cpu(mr->dev,
+					   sg_dma_address(sg),
+					   sg_len,
+					   mr->dir);
+		size -= sg_len;
+		if (!size)
+			break;
+	}
+}
+
+void pcs_rdma_mr_sync_for_device(struct pcs_rdma_mr *mr, size_t size)
+{
+	struct scatterlist *sg;
+	unsigned int i;
+
+	for_each_sg(mr->sbuf.sgl, sg, mr->sbuf.sg_cnt, i) {
+		size_t sg_len = min_t(size_t, size, sg_dma_len(sg));
+		ib_dma_sync_single_for_device(mr->dev,
+					      sg_dma_address(sg),
+					      sg_len,
+					      mr->dir);
+		size -= sg_len;
+		if (!size)
+			break;
+	}
+}
+
+int pcs_rdma_mr_sync_for_msg(struct pcs_rdma_mr *mr, struct pcs_msg *msg,
+			     size_t offset, size_t end_offset, bool to_msg)
+{
+	struct iov_iter it;
+	struct scatterlist *mr_sg;
+	size_t mr_off;
+	unsigned int dir = to_msg ? READ : WRITE;
+
+	if (offset >= end_offset || end_offset > msg->size ||
+	    end_offset - offset > mr->size)
+		return -EINVAL;
+
+	iov_iter_kvec(&it, dir, NULL, 0, 0);
+
+	mr_sg = mr->sbuf.sgl;
+	mr_off = 0;
+
+	while (offset < end_offset) {
+		size_t msg_off, msg_len, res;
+
+		if (!iov_iter_count(&it))
+			msg->get_iter(msg, offset, &it, dir);
+
+		msg_len = iov_iter_count(&it);
+		if (msg_len > end_offset - offset)
+			msg_len = end_offset - offset;
+
+		msg_off = 0;
+		while (msg_off < msg_len) {
+			void *mr_data = sg_virt(mr_sg);
+			size_t chunk_size = min_t(size_t, msg_len - msg_off, PAGE_SIZE - mr_off);
+
+			if (to_msg)
+				res = copy_to_iter(mr_data + mr_off, chunk_size, &it);
+			else
+				res = copy_from_iter(mr_data + mr_off, chunk_size, &it);
+			BUG_ON(res != chunk_size);
+
+			mr_off += chunk_size;
+			msg_off += chunk_size;
+			if (mr_off == PAGE_SIZE) {
+				mr_sg = sg_next(mr_sg);
+				mr_off = 0;
+			}
+		}
+
+		offset += msg_len;
+	}
+
+	return 0;
+}
+
+int pcs_rdma_mr_pool_init(struct pcs_rdma_mr_pool *pool, size_t mr_size,
+			  size_t mr_cnt, struct ib_device *dev, struct ib_pd *pd,
+			  enum dma_data_direction dir, gfp_t gfp, struct pcs_ib_mr_pool *ib_mr_pool)
+{
+	struct pcs_rdma_mr *mr;
+	int ret, i;
+
+	pool->mr_size = mr_size;
+	pool->mr_cnt = mr_cnt;
+	pool->dev = dev;
+	pool->pd = pd;
+	pool->dir = dir;
+	pool->gfp = gfp;
+	pool->ib_mr_pool = ib_mr_pool;
+
+	spin_lock_init(&pool->lock);
+
+	INIT_LIST_HEAD(&pool->mr_list);
+	pool->used_mrs = 0;
+
+	for (i = 0; i < mr_cnt; i++) {
+		mr = pcs_rdma_mr_alloc(dev, pd, dir, mr_size, gfp, ib_mr_pool);
+		if (IS_ERR(mr)) {
+			ret = PTR_ERR(mr);
+			goto fail;
+		}
+		list_add_tail(&mr->entry, &pool->mr_list);
+	}
+
+	return 0;
+
+fail:
+	pcs_rdma_mr_pool_destroy(pool);
+	return ret;
+}
+
+void pcs_rdma_mr_pool_destroy(struct pcs_rdma_mr_pool *pool)
+{
+	struct pcs_rdma_mr *mr, *tmp;
+
+	spin_lock_irq(&pool->lock);
+	BUG_ON(pool->used_mrs);
+	spin_unlock_irq(&pool->lock);
+
+	list_for_each_entry_safe(mr, tmp, &pool->mr_list, entry)
+		pcs_rdma_mr_free(mr);
+}
+
+struct pcs_rdma_mr* pcs_rdma_mr_pool_get(struct pcs_rdma_mr_pool *pool)
+{
+	struct pcs_rdma_mr *mr;
+	unsigned long flags;
+
+	spin_lock_irqsave(&pool->lock, flags);
+	mr = list_first_entry_or_null(&pool->mr_list, struct pcs_rdma_mr, entry);
+	if (mr) {
+		list_del(&mr->entry);
+		BUG_ON(pool->used_mrs >= pool->mr_cnt);
+		pool->used_mrs++;
+	}
+	spin_unlock_irqrestore(&pool->lock, flags);
+
+	if (!mr && !in_interrupt()) {
+		mr = pcs_rdma_mr_alloc(pool->dev, pool->pd, pool->dir,
+				       pool->mr_size, pool->gfp,
+				       pool->ib_mr_pool);
+		if (IS_ERR(mr))
+			return NULL;
+
+		spin_lock_irqsave(&pool->lock, flags);
+		pool->mr_cnt++;
+		BUG_ON(pool->used_mrs >= pool->mr_cnt);
+		pool->used_mrs++;
+		spin_unlock_irqrestore(&pool->lock, flags);
+	}
+
+	return mr;
+}
+
+void pcs_rdma_mr_pool_put(struct pcs_rdma_mr_pool *pool, struct pcs_rdma_mr *mr)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&pool->lock, flags);
+	list_add(&mr->entry, &pool->mr_list);
+	BUG_ON(!pool->used_mrs);
+	pool->used_mrs--;
+	spin_unlock_irqrestore(&pool->lock, flags);
+}
+
+static void pcs_rdma_msg_cleanup_map(struct pcs_rdma_msg *rdma_msg)
+{
+	pcs_rdma_mr_destroy(rdma_msg->mr);
+}
+
+static int pcs_rdma_msg_init_map(struct pcs_rdma_msg *rdma_msg, struct pcs_msg *msg,
+				 size_t offset, size_t end_offset,
+				 struct pcs_rdma_mr_pool *pool)
+{
+	int ret;
+
+	if (offset >= end_offset || end_offset > msg->size)
+		return -EINVAL;
+
+	rdma_msg->msg = msg;
+	rdma_msg->offset = offset;
+	rdma_msg->end_offset = end_offset;
+
+	rdma_msg->pool = NULL;
+	rdma_msg->cleanup = pcs_rdma_msg_cleanup_map;
+
+	rdma_msg->first_wr = NULL;
+	rdma_msg->last_wr = NULL;
+
+	rdma_msg->mr = &rdma_msg->_inline_mr;
+	ret = pcs_rdma_mr_init_from_msg(rdma_msg->mr, pool->dev, pool->pd, pool->dir,
+					msg, offset, end_offset, pool->gfp, pool->ib_mr_pool);
+	if (ret)
+		return ret;
+
+	rdma_msg->lkey = rdma_msg->mr->mr->lkey;
+	rdma_msg->rkey = rdma_msg->mr->mr->rkey;
+	rdma_msg->iova = rdma_msg->mr->mr->iova;
+
+	if (rdma_msg->mr->first_wr && rdma_msg->mr->last_wr) {
+		rdma_msg->first_wr = rdma_msg->mr->first_wr;
+		rdma_msg->last_wr = rdma_msg->mr->last_wr;
+		rdma_msg->mr->first_wr = NULL;
+		rdma_msg->mr->last_wr = NULL;
+	}
+
+	return 0;
+}
+
+static void pcs_rdma_msg_cleanup(struct pcs_rdma_msg *rdma_msg)
+{
+	pcs_rdma_mr_sync_for_cpu(rdma_msg->mr,
+				 PAGE_ALIGN(rdma_msg->end_offset - rdma_msg->offset));
+	if (rdma_msg->mr->dir == DMA_BIDIRECTIONAL ||
+	    rdma_msg->mr->dir == DMA_FROM_DEVICE)
+		BUG_ON(pcs_rdma_mr_sync_for_msg(rdma_msg->mr, rdma_msg->msg,
+						rdma_msg->offset,
+						rdma_msg->end_offset, true));
+
+	if (rdma_msg->pool)
+		pcs_rdma_mr_pool_put(rdma_msg->pool, rdma_msg->mr);
+	else /* fallback */
+		pcs_rdma_mr_destroy(rdma_msg->mr);
+}
+
+int pcs_rdma_msg_init(struct pcs_rdma_msg *rdma_msg, struct pcs_msg *msg,
+		      size_t offset, size_t end_offset, struct pcs_rdma_mr_pool *pool,
+		      bool try_to_map)
+{
+	size_t mr_size = PAGE_ALIGN(end_offset - offset);
+	int ret;
+
+	if (offset >= end_offset || end_offset > msg->size)
+		return -EINVAL;
+
+	if (try_to_map && !pcs_rdma_msg_init_map(rdma_msg, msg, offset, end_offset, pool))
+		return 0;
+
+	rdma_msg->msg = msg;
+	rdma_msg->offset = offset;
+	rdma_msg->end_offset = end_offset;
+
+	rdma_msg->pool = mr_size > pool->mr_size ? NULL : pool;
+	rdma_msg->cleanup = pcs_rdma_msg_cleanup;
+
+	/* For testing fallback */
+	RE_SET(rdma_msg->pool, NULL);
+
+	rdma_msg->first_wr = NULL;
+	rdma_msg->last_wr = NULL;
+
+	if (rdma_msg->pool) {
+		rdma_msg->mr = RE_NULL(pcs_rdma_mr_pool_get(rdma_msg->pool));
+		if (!rdma_msg->mr)
+			return -ENOMEM;
+	} else { /* fallback */
+		rdma_msg->mr = &rdma_msg->_inline_mr;
+		ret = pcs_rdma_mr_init(rdma_msg->mr, pool->dev, pool->pd, pool->dir,
+				       mr_size, pool->gfp, pool->ib_mr_pool);
+		if (ret)
+			return ret;
+	}
+
+	if (rdma_msg->mr->dir == DMA_BIDIRECTIONAL ||
+	    rdma_msg->mr->dir == DMA_TO_DEVICE)
+		BUG_ON(pcs_rdma_mr_sync_for_msg(rdma_msg->mr, msg, offset,
+						end_offset, false));
+	pcs_rdma_mr_sync_for_device(rdma_msg->mr, mr_size);
+
+	rdma_msg->lkey = rdma_msg->mr->mr->lkey;
+	rdma_msg->rkey = rdma_msg->mr->mr->rkey;
+	rdma_msg->iova = rdma_msg->mr->mr->iova;
+
+	if (rdma_msg->mr->first_wr && rdma_msg->mr->last_wr) {
+		rdma_msg->first_wr = rdma_msg->mr->first_wr;
+		rdma_msg->last_wr = rdma_msg->mr->last_wr;
+		rdma_msg->mr->first_wr = NULL;
+		rdma_msg->mr->last_wr = NULL;
+	}
+
+	return 0;
+}
+
+void pcs_rdma_msg_destroy(struct pcs_rdma_msg *rdma_msg)
+{
+	rdma_msg->cleanup(rdma_msg);
+}
diff --git a/fs/fuse/kio/pcs/pcs_rdma_rw.h b/fs/fuse/kio/pcs/pcs_rdma_rw.h
new file mode 100644
index 000000000000..22ea59d02baf
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_rw.h
@@ -0,0 +1,184 @@
+#ifndef _PCS_RMDA_RW_H_
+#define _PCS_RMDA_RW_H_ 1
+
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/scatterlist.h>
+#include <linux/dma-direction.h>
+
+#include <rdma/ib_verbs.h>
+
+#include "pcs_sock_io.h"
+
+#if defined(CONFIG_DEBUG_KERNEL) && defined(CONFIG_FUSE_KIO_DEBUG)
+extern u32 rdmaio_io_failing;
+#define RE_FAIL() \
+({ \
+	static atomic_t __fail_cnt = ATOMIC_INIT(1); \
+	static atomic_t __fail_hop = ATOMIC_INIT(1); \
+	bool __ret = false; \
+	if (rdmaio_io_failing && atomic_dec_return(&__fail_cnt) <= 0) { \
+		atomic_add(atomic_inc_return(&__fail_hop), &__fail_cnt); \
+		TRACE("RE: fail!!!\n"); \
+		__ret = true; \
+	} \
+	__ret; \
+})
+#define RE(func, err) (RE_FAIL() ? err : (func))
+#define RE_PTR(func, err) (RE_FAIL() ? ERR_PTR(err) : (func))
+#define RE_NULL(func) (RE_FAIL() ? NULL : (func))
+#define RE_SET(var, val) { if (RE_FAIL()) var = val; }
+#else
+#define RE(func, err) func
+#define RE_PTR(func, err) func
+#define RE_NULL(func) func
+#define RE_SET(var, val)
+#endif
+
+#define RE_INV(func) RE(func, -EINVAL)
+#define RE_PTR_INV(func) RE_PTR(func, -EINVAL)
+
+struct pcs_sgl_buf
+{
+	struct page **pages;
+	size_t pg_cnt;
+	void (*page_clean)(struct page *page);
+
+	struct scatterlist *sgl;
+	size_t sg_cnt;
+};
+
+int pcs_sgl_buf_init_from_msg(struct pcs_sgl_buf *sbuf, unsigned int dir, struct pcs_msg *msg,
+			      size_t offset, size_t end_offset, gfp_t gfp, bool allow_gaps);
+int pcs_sgl_buf_init(struct pcs_sgl_buf *sbuf, size_t size, gfp_t gfp);
+void pcs_sgl_buf_destroy(struct pcs_sgl_buf *sbuf);
+
+struct pcs_ib_mr_pool
+{
+	struct ib_pd *pd;
+	enum ib_mr_type mr_type;
+	u32 max_num_sg;
+	size_t mr_cnt;
+
+	spinlock_t lock;
+
+	struct list_head mr_list;
+	size_t used_mrs;
+};
+
+int pcs_ib_mr_pool_init(struct pcs_ib_mr_pool *pool, struct ib_pd *pd,
+			enum ib_mr_type mr_type, u32 max_num_sg, size_t mr_cnt);
+void pcs_ib_mr_pool_destroy(struct pcs_ib_mr_pool *pool);
+
+struct ib_mr* pcs_ib_mr_pool_get(struct pcs_ib_mr_pool *pool);
+void pcs_ib_mr_pool_put(struct pcs_ib_mr_pool *pool, struct ib_mr *mr);
+
+struct pcs_rdma_rw
+{
+	struct ib_device *dev;
+	enum dma_data_direction dir;
+
+	struct pcs_sgl_buf sbuf;
+
+	struct ib_sge *sges;
+	struct ib_rdma_wr *wrs;
+	size_t nr_wrs;
+};
+
+int pcs_rdma_rw_init_from_msg(struct pcs_rdma_rw *rw, struct ib_device *dev,
+			      enum dma_data_direction dir, u64 remote_addr, u32 rkey,
+			      u32 local_dma_lkey, struct pcs_msg *msg, size_t offset,
+			      size_t end_offset, gfp_t gfp, size_t sge_per_wr);
+void pcs_rdma_rw_destroy(struct pcs_rdma_rw *rw);
+
+struct pcs_rdma_mr
+{
+	struct list_head entry;
+
+	struct ib_device *dev;
+	struct ib_pd *pd;
+	enum dma_data_direction dir;
+	size_t size;
+	struct pcs_ib_mr_pool *ib_mr_pool;
+
+	struct pcs_sgl_buf sbuf;
+	struct ib_mr *mr;
+
+	struct ib_send_wr inv_wr;
+	struct ib_reg_wr reg_wr;
+
+	struct ib_send_wr *first_wr;
+	struct ib_send_wr *last_wr;
+};
+
+int pcs_rdma_mr_init_from_msg(struct pcs_rdma_mr *mr, struct ib_device *dev,
+			      struct ib_pd *pd, enum dma_data_direction dir, struct pcs_msg *msg,
+			      size_t offset, size_t end_offset, gfp_t gfp,
+			      struct pcs_ib_mr_pool *ib_mr_pool);
+int pcs_rdma_mr_init(struct pcs_rdma_mr *mr, struct ib_device *dev,
+		     struct ib_pd *pd, enum dma_data_direction dir, size_t size,
+		     gfp_t gfp, struct pcs_ib_mr_pool *ib_mr_pool);
+void pcs_rdma_mr_destroy(struct pcs_rdma_mr *mr);
+
+struct pcs_rdma_mr* pcs_rdma_mr_alloc(struct ib_device *dev, struct ib_pd *pd,
+				      enum dma_data_direction dir, size_t size, gfp_t gfp,
+				      struct pcs_ib_mr_pool *ib_mr_pool);
+void pcs_rdma_mr_free(struct pcs_rdma_mr *mr);
+
+void pcs_rdma_mr_sync_for_cpu(struct pcs_rdma_mr *mr, size_t size);
+void pcs_rdma_mr_sync_for_device(struct pcs_rdma_mr *mr, size_t size);
+
+int pcs_rdma_mr_sync_for_msg(struct pcs_rdma_mr *mr, struct pcs_msg *msg,
+			     size_t offset, size_t end_offset, bool to_msg);
+
+struct pcs_rdma_mr_pool
+{
+	size_t mr_size;
+	size_t mr_cnt;
+	struct ib_device *dev;
+	struct ib_pd *pd;
+	enum dma_data_direction dir;
+	gfp_t gfp;
+	struct pcs_ib_mr_pool *ib_mr_pool;
+
+	spinlock_t lock;
+
+	struct list_head mr_list;
+	size_t used_mrs;
+};
+
+int pcs_rdma_mr_pool_init(struct pcs_rdma_mr_pool *pool, size_t mr_size,
+			  size_t mr_cnt, struct ib_device *dev, struct ib_pd *pd,
+			  enum dma_data_direction dir, gfp_t gfp,
+			  struct pcs_ib_mr_pool *ib_mr_pool);
+void pcs_rdma_mr_pool_destroy(struct pcs_rdma_mr_pool *pool);
+
+struct pcs_rdma_mr* pcs_rdma_mr_pool_get(struct pcs_rdma_mr_pool *pool);
+void pcs_rdma_mr_pool_put(struct pcs_rdma_mr_pool *pool,
+			  struct pcs_rdma_mr *mr);
+
+struct pcs_rdma_msg
+{
+	struct pcs_msg *msg;
+	size_t offset;
+	size_t end_offset;
+
+	struct pcs_rdma_mr_pool *pool;
+	struct pcs_rdma_mr *mr;
+	struct pcs_rdma_mr _inline_mr;
+	void (*cleanup)(struct pcs_rdma_msg *rdma_msg);
+
+	u32 lkey;
+	u32 rkey;
+	u64 iova;
+
+	struct ib_send_wr *first_wr;
+	struct ib_send_wr *last_wr;
+};
+
+int pcs_rdma_msg_init(struct pcs_rdma_msg *rdma_msg, struct pcs_msg *msg,
+		      size_t offset, size_t end_offset, struct pcs_rdma_mr_pool *pool,
+		      bool try_to_map);
+void pcs_rdma_msg_destroy(struct pcs_rdma_msg *rdma_msg);
+
+#endif /* _PCS_RMDA_RW_H_ */


More information about the Devel mailing list