[Devel] [PATCH RHEL8 COMMIT] fs/fuse kio: implement support RDMA transport
Konstantin Khorenko
khorenko at virtuozzo.com
Thu Oct 15 10:37:35 MSK 2020
The commit is pushed to "branch-rh8-4.18.0-193.6.3.vz8.4.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh8-4.18.0-193.6.3.vz8.4.13
------>
commit 05f08453aa9ec285106433ed263d1c288d9430f8
Author: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
Date: Thu Oct 15 10:37:35 2020 +0300
fs/fuse kio: implement support RDMA transport
This patch adds support of RDMA transport for RPC.
For RDMA read requests is possible to enable/disable usage of prealloceted dma memory region
through sysfs '/sys/module/fuse_kio_pcs/parameters/rdmaio_use_dma_mr_for_rdma_rw'
This patch provides the ability to enable maping a list of msg pages
to rdma memory region through sysfs '/sys/module/fuse_kio_pcs/parameters/rdmaio_use_map_for_mr',
instead of copy pages to/from preallocated and registred region.
Also possible to configure RDMA io queue depth through sysfs
'/sys/module/fuse_kio_pcs/parameters/rdmaio_queue_depth'
https://pmc.acronis.com/browse/VSTOR-4310
Signed-off-by: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
+++
fs/fuse kio: resolve name conflicts with ib_core
Rename ib_mr_pool_{init,destroy,get,set} to avoid conflicts
with names from include/rdma/mr_pool.h
mFixes: 867c0e15fddf (fs/fuse kio: implement support RDMA transport)
Signed-off-by: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
+++
fs/fuse kio: add dependency from infiniband to Kconfig
In previous patches RDMA transport support was added but
dependency from INFINIBAND is missed.
mFixes: 867c0e15fddf (fs/fuse kio: implement support RDMA transport)
Signed-off-by: Ildar Ismagilov <ildar.ismagilov at virtuozzo.com>
Signed-off-by: Ildar Ismagilov <Ildar.Ismagilov at acronis.com>
---
fs/fuse/Kconfig | 2 +-
fs/fuse/Makefile | 5 +-
fs/fuse/kio/pcs/pcs_cs.c | 15 +-
fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 31 +-
fs/fuse/kio/pcs/pcs_rdma_conn.c | 184 ++++
fs/fuse/kio/pcs/pcs_rdma_conn.h | 6 +
fs/fuse/kio/pcs/pcs_rdma_io.c | 1720 ++++++++++++++++++++++++++++++++++++
fs/fuse/kio/pcs/pcs_rdma_io.h | 114 +++
fs/fuse/kio/pcs/pcs_rdma_prot.h | 119 +++
fs/fuse/kio/pcs/pcs_rdma_rw.c | 815 +++++++++++++++++
fs/fuse/kio/pcs/pcs_rdma_rw.h | 184 ++++
11 files changed, 3181 insertions(+), 14 deletions(-)
diff --git a/fs/fuse/Kconfig b/fs/fuse/Kconfig
index f4d23ceabedc..732fb3aa6638 100644
--- a/fs/fuse/Kconfig
+++ b/fs/fuse/Kconfig
@@ -56,7 +56,7 @@ config FUSE_KIO_NULLIO
config FUSE_KIO_PCS
tristate "Enable kdirect PCS io engine"
- depends on FUSE_FS
+ depends on FUSE_FS && INFINIBAND && INFINIBAND_ADDR_TRANS
help
This FUSE extension allows to forward io requests directly to PCS
diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index 8ac6c4cfe114..ccc0855a3ea8 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -28,7 +28,10 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
kio/pcs/fuse_io.o \
kio/pcs/fuse_stat.o \
kio/pcs/pcs_sock_conn.o \
- kio/pcs/pcs_auth.o
+ kio/pcs/pcs_auth.o \
+ kio/pcs/pcs_rdma_io.o \
+ kio/pcs/pcs_rdma_rw.o \
+ kio/pcs/pcs_rdma_conn.o
fuse-objs := dev.o dir.o file.o inode.o control.o xattr.o acl.o readdir.o
virtiofs-y += virtio_fs.o
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index 57b07df00f55..1529d06fdc8f 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -7,13 +7,13 @@
#include "pcs_types.h"
#include "pcs_sock_io.h"
#include "pcs_rpc.h"
-#include "pcs_sock_io.h"
#include "pcs_req.h"
#include "pcs_map.h"
#include "pcs_cs.h"
#include "pcs_cs_prot.h"
#include "pcs_cluster.h"
#include "pcs_sock_conn.h"
+#include "pcs_rdma_conn.h"
#include "pcs_ioctl.h"
#include "log.h"
#include "fuse_ktrace.h"
@@ -458,17 +458,12 @@ static void cs_connect(struct pcs_rpc *ep)
connect_start = pcs_sockconnect_start;
} else {
/* TODO: print sock addr using pcs_format_netaddr() */
- if (ep->addr.type != PCS_ADDRTYPE_RDMA) {
- if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
- TRACE("netaddr to sockaddr failed");
- goto fail;
- }
- connect_start = pcs_sockconnect_start;
- } else {
- WARN_ON_ONCE(1);
- /* TODO: rdma connect init */
+ if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
+ TRACE("netaddr to sockaddr failed");
goto fail;
}
+ connect_start = ep->addr.type == PCS_ADDRTYPE_RDMA ?
+ pcs_rdmaconnect_start : pcs_sockconnect_start;
}
ep->state = PCS_RPC_CONNECT;
connect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 9a40434ba841..3205288da404 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -39,8 +39,30 @@ unsigned int debugfs_tracing = DEBUGFS_TRACE;
module_param(debugfs_tracing, uint, 0644);
MODULE_PARM_DESC(debugfs_tracing, "Enable/Disbale debugfs tracing");
+bool rdmaio_use_map_for_mr = false;
+module_param(rdmaio_use_map_for_mr, bool, 0644);
+MODULE_PARM_DESC(rdmaio_use_map_for_mr, "Enable/Disbale usage of map for RDMA MRs");
+
+bool rdmaio_use_dma_mr_for_rdma_rw = true;
+module_param(rdmaio_use_dma_mr_for_rdma_rw, bool, 0644);
+MODULE_PARM_DESC(rdmaio_use_dma_mr_for_rdma_rw,
+ "Enable/Disbale usage of DMA memory region for RDMA read/write requests");
+
+unsigned int rdmaio_cq_count = 0;
+module_param(rdmaio_cq_count, uint, 0644);
+MODULE_PARM_DESC(rdmaio_cq_count, "RDMA CQ count");
+
+unsigned int rdmaio_cq_period = 0;
+module_param(rdmaio_cq_period, uint, 0644);
+MODULE_PARM_DESC(rdmaio_cq_period, "RDMA CQ period in microsecond");
+
+unsigned int rdmaio_queue_depth = 8;
+module_param(rdmaio_queue_depth, uint, 0644);
+MODULE_PARM_DESC(rdmaio_queue_depth, "RDMA queue depth");
+
#ifdef CONFIG_DEBUG_KERNEL
-static int set_sockio_fail_percent(const char *val, const struct kernel_param *kp)
+
+static int set_io_fail_percent(const char *val, const struct kernel_param *kp)
{
unsigned *p;
int rv;
@@ -57,10 +79,15 @@ static int set_sockio_fail_percent(const char *val, const struct kernel_param *k
}
u32 sockio_fail_percent;
-module_param_call(sockio_fail_percent, set_sockio_fail_percent,
+module_param_call(sockio_fail_percent, set_io_fail_percent,
param_get_uint, &sockio_fail_percent, 0644);
__MODULE_PARM_TYPE(sockio_fail_percent, "uint");
MODULE_PARM_DESC(sockio_fail_percent, "Sock io failing rate in percents");
+
+bool rdmaio_io_failing = false;
+module_param(rdmaio_io_failing, bool, 0644);
+MODULE_PARM_DESC(rdmaio_io_failing, "Enable/Disbale RDMA io failing");
+
#endif
static int fuse_ktrace_setup(struct fuse_conn * fc);
diff --git a/fs/fuse/kio/pcs/pcs_rdma_conn.c b/fs/fuse/kio/pcs/pcs_rdma_conn.c
new file mode 100644
index 000000000000..ff6225afbbfa
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_conn.c
@@ -0,0 +1,184 @@
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <linux/types.h>
+
+#include <rdma/rdma_cm.h>
+
+#include "pcs_types.h"
+#include "pcs_rdma_io.h"
+#include "pcs_rpc.h"
+#include "pcs_cluster.h"
+#include "pcs_auth.h"
+#include "log.h"
+#include "fuse_ktrace.h"
+
+#define RESOLVE_TIMEOUT_MS 5000
+
+enum {
+ RDMA_MAX_RESP_RES = 0xFF,
+ RDMA_MAX_INIT_DEPTH = 0xFF
+};
+
+struct pcs_rdmaconnect
+{
+ struct pcs_rpc *ep;
+
+ struct rdma_cm_id *cmid;
+ struct pcs_rdmaio *rio;
+
+ struct pcs_rdma_id id;
+
+ enum rdma_cm_event_type cm_event;
+ struct completion cm_done;
+};
+
+extern unsigned int rdmaio_queue_depth;
+
+static void
+conn_param_init(struct rdma_conn_param *cp, struct pcs_rdmaio_conn_req *cr)
+{
+ memset(cp, 0, sizeof(*cp));
+
+ if (cr) {
+ cp->private_data = cr;
+ cp->private_data_len = sizeof(*cr);
+ }
+
+ /* these two guys are about RDMA reads: see man rdma_connect(3) */
+ cp->responder_resources = RDMA_MAX_RESP_RES;
+ cp->initiator_depth = RDMA_MAX_INIT_DEPTH;
+
+ cp->flow_control = 1; /* does not matter */
+ cp->retry_count = 0; /* # retransmissions when no ACK received */
+ cp->rnr_retry_count = 0; /* # RNR retransmissions */
+}
+
+static int pcs_rdma_cm_event_handler(struct rdma_cm_id *cmid,
+ struct rdma_cm_event *event)
+{
+ struct pcs_rdma_id *id = cmid->context;
+ struct pcs_rdmaconnect *rc = container_of(id, struct pcs_rdmaconnect, id);
+ struct rdma_conn_param conn_param;
+
+ TRACE("event: %d, status: %d\n", event->event, event->status);
+
+ rc->cm_event = event->event;
+ switch (event->event) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ if (rdma_resolve_route(cmid, RESOLVE_TIMEOUT_MS))
+ complete(&rc->cm_done);
+ break;
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ rc->rio = pcs_rdma_create(sizeof(struct pcs_rpc_hdr),
+ rc->cmid, rdmaio_queue_depth, rc->ep);
+ if (!rc->rio) {
+ complete(&rc->cm_done);
+ break;
+ }
+ rc->cmid = NULL;
+
+ conn_param_init(&conn_param, &rc->rio->conn_req);
+ if (rdma_connect(cmid, &conn_param)) {
+ TRACE("rdma_connect failed: rio: 0x%p\n", rc->rio);
+ complete(&rc->cm_done);
+ }
+ break;
+ case RDMA_CM_EVENT_ESTABLISHED:
+ cmid->context = &rc->rio->id;
+ complete(&rc->cm_done);
+ break;
+ case RDMA_CM_EVENT_REJECTED:
+ TRACE("pcs_rdma_cm_event_handler reject: %s, rio: 0x%p\n",
+ rdma_reject_msg(cmid, event->status), rc->rio);
+ complete(&rc->cm_done);
+ break;
+ default:
+ complete(&rc->cm_done);
+ }
+
+ return 0;
+}
+
+static int pcs_rdma_event_handler(struct rdma_cm_id *cmid,
+ struct rdma_cm_event *event)
+{
+ struct pcs_rdma_id *id = cmid->context;
+ return id->event_handler(cmid, event);
+}
+
+void pcs_rdmaconnect_start(struct pcs_rpc *ep)
+{
+ struct pcs_rdmaconnect rc = {};
+ struct sockaddr *sa = &ep->sh.sa;
+ int ret;
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ TRACE("rdma connection start\n");
+
+ rc.ep = ep;
+ rc.id.event_handler = pcs_rdma_cm_event_handler;
+ init_completion(&rc.cm_done);
+
+ rc.cmid = rdma_create_id(&init_net, pcs_rdma_event_handler, &rc.id,
+ RDMA_PS_TCP, IB_QPT_RC);
+ if (IS_ERR(rc.cmid)) {
+ TRACE("rdma_create_id failed: %ld\n", PTR_ERR(rc.cmid));
+ goto fail;
+ }
+
+ ret = rdma_resolve_addr(rc.cmid, NULL, sa, RESOLVE_TIMEOUT_MS);
+ if (ret) {
+ TRACE("rdma_resolve_addr failed: %d\n", ret);
+ goto fail_cm;
+ }
+
+ wait_for_completion(&rc.cm_done);
+ if (rc.cm_event != RDMA_CM_EVENT_ESTABLISHED) {
+ TRACE("rdma connection failed: %d\n", rc.cm_event);
+ goto fail_cm;
+ }
+
+ TRACE(PEER_FMT " state: %d, rio: 0x%p\n", PEER_ARGS(ep), ep->state, rc.rio);
+ cancel_delayed_work(&ep->timer_work);
+ ep->retries++;
+
+ ep->conn = &rc.rio->netio.ioconn;
+ rc.rio->send_timeout = PCS_SIO_TIMEOUT;
+ rc.rio->rio_state = RIO_STATE_ESTABLISHED;
+ rc.rio->netio.ioconn.destruct = pcs_rdma_ioconn_destruct;
+ rc.rio->netio.tops = &pcs_rdma_netio_tops;
+ rc.rio->netio.getmsg = rpc_get_hdr;
+ rc.rio->netio.eof = rpc_eof_cb;
+ if (ep->gc)
+ list_lru_add(&ep->gc->lru, &ep->lru_link);
+
+ if (ep->flags & PCS_RPC_F_CLNT_PEER_ID)
+ ep->flags |= PCS_RPC_F_PEER_ID;
+
+ ep->state = PCS_RPC_AUTH;
+ ret = rpc_client_start_auth(ep, PCS_AUTH_DIGEST,
+ cc_from_rpc(ep->eng)->cluster_name);
+ if (ret < 0) {
+ TRACE("rdma authorization failed: %d, rio: 0x%p",
+ ret, rc.rio);
+ goto fail; /* since ep->conn is initialized,
+ * rio will be freed in pcs_rpc_reset()
+ */
+ }
+
+ TRACE("rdma connection established: rio: 0x%p\n", rc.rio);
+
+ ep->state = PCS_RPC_APPWAIT;
+ pcs_rpc_enable(ep, 0);
+ return;
+
+fail_cm:
+ if (rc.rio)
+ pcs_rdma_destroy(rc.rio);
+ if (rc.cmid)
+ rdma_destroy_id(rc.cmid);
+fail:
+ pcs_rpc_reset(ep);
+ return;
+}
diff --git a/fs/fuse/kio/pcs/pcs_rdma_conn.h b/fs/fuse/kio/pcs/pcs_rdma_conn.h
new file mode 100644
index 000000000000..88c987e06cd6
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_conn.h
@@ -0,0 +1,6 @@
+#ifndef _PCS_RMDA_CONN_H_
+#define _PCS_RMDA_CONN_H_ 1
+
+void pcs_rdmaconnect_start(struct pcs_rpc *ep);
+
+#endif /* _PCS_RMDA_CONN_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c
new file mode 100644
index 000000000000..629a81e8fa3c
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_io.c
@@ -0,0 +1,1720 @@
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+
+#include <rdma/ib_verbs.h>
+
+#include "pcs_types.h"
+#include "pcs_rdma_io.h"
+#include "pcs_rdma_rw.h"
+#include "pcs_cluster.h"
+#include "pcs_rpc.h"
+#include "log.h"
+
+#define RDMA_THRESHOLD (5*1024)
+
+#define RDMA_MAX_MSG_PAYLOAD (32 << PAGE_SHIFT)
+#define RDMA_MAX_SEGMENTS 256
+
+enum {
+ PCS_RDMA_IO_ERROR,
+ PCS_RDMA_IO_CQE,
+};
+
+//#undef TRACE
+//#define TRACE(fmt, args...) printk(KERN_ERR "%s:%d: " fmt, __func__, __LINE__, ## args)
+
+struct rio_job
+{
+ struct list_head list;
+
+ int (*work)(struct rio_job *job);
+ void (*destroy)(struct rio_job *job);
+};
+
+struct rio_cqe
+{
+ enum ib_wc_status status;
+ int ib_wr_count;
+ void (*done)(struct rio_cqe *cqe, bool sync_mode);
+};
+
+struct rio_rx {
+ struct list_head list;
+
+ struct pcs_rdmaio *rio;
+ struct rio_cqe cqe;
+ struct ib_sge sge;
+ struct ib_recv_wr wr;
+};
+
+enum {
+ TX_FREE, /* free tx request available for use */
+ TX_WAIT_FOR_TX_COMPL, /* tx request sent, wait for TX completion */
+ TX_WAIT_FOR_READ_ACK, /* wait for peer to ack RDMA read */
+ TX_MSG_DONE, /* default: call msg->done() */
+ TX_SUBMIT_RDMA_READ_ACK, /* let our peer know that our RDMA_READ is done */
+};
+
+struct rio_tx {
+ struct list_head list; /* either member of rio->dev->free_txs or rio->active_txs */
+ struct pcs_msg *msg; /* msg to call ->done() when we're done */
+ u64 xid; /* xid that we've read from wire; used to construct ACK */
+ int tx_state; /* what we should do on TX completion; see enum above */
+
+ char *buf;
+ dma_addr_t dma_addr;
+
+ struct pcs_rdmaio *rio;
+
+ struct rio_cqe cqe;
+ struct rio_cqe err_cqe;
+
+ union {
+ struct {
+ struct ib_sge sge;
+ struct ib_send_wr wr;
+ struct pcs_rdma_msg msg;
+ } send;
+
+ struct {
+ struct ib_sge sge;
+ struct ib_rdma_wr wr;
+ struct pcs_rdma_msg msg;
+ } rdma_mr;
+
+ struct pcs_rdma_rw rdma_rw;
+ };
+
+ void (*cleanup)(struct rio_tx *tx);
+};
+
+struct rio_rdma_read_job {
+ struct rio_job job;
+
+ struct pcs_rdmaio *rio;
+ struct pcs_msg *msg;
+ int offset;
+ struct pcs_remote_buf rb;
+};
+
+struct pcs_rdma_device {
+ struct ib_device *ib_dev;
+ struct ib_pd *pd;
+
+ struct list_head free_txs; /* list head of free TX frames */
+ int free_txs_cnt;
+
+ struct pcs_ib_mr_pool ib_mr_pool;
+ struct pcs_rdma_mr_pool sd_mr_pool;
+ struct pcs_rdma_mr_pool rd_mr_pool;
+};
+
+extern bool rdmaio_use_map_for_mr;
+extern bool rdmaio_use_dma_mr_for_rdma_rw;
+extern unsigned int rdmaio_cq_count;
+extern unsigned int rdmaio_cq_period;
+
+static void rio_abort(struct pcs_rdmaio *rio, int error);
+
+static void rio_rx_done(struct rio_cqe *cqe, bool sync_mode);
+static void rio_tx_done(struct rio_cqe *cqe, bool sync_mode);
+static void rio_tx_err_occured(struct rio_cqe *cqe, bool sync_mode);
+
+/*
+ * A trivial helper representing 1:1 mapping between
+ * rio->rx_descs[RIO_N_RX] and rio->rx_bufs[queue_depth * RIO_MSG_SIZE]
+ */
+static char *rx2buf(struct pcs_rdmaio *rio, struct rio_rx *rx)
+{
+ return rio->rx_bufs + RIO_MSG_SIZE * (rx - rio->rx_descs);
+}
+static dma_addr_t rx2dma(struct pcs_rdmaio *rio, struct rio_rx *rx)
+{
+ return rio->rx_bufs_dma + RIO_MSG_SIZE * (rx - rio->rx_descs);
+}
+
+/* Only called when rio->write_queue is not empty */
+static struct pcs_msg *rio_dequeue_msg(struct pcs_rdmaio *rio)
+{
+ struct pcs_msg *msg = list_first_entry(&rio->write_queue,
+ struct pcs_msg, list);
+ list_del_init(&msg->list);
+ return msg;
+}
+
+/* Only called when rio->reserved_queue is not empty */
+static struct pcs_msg *rio_dequeue_reserved_msg(struct pcs_rdmaio *rio)
+{
+ struct pcs_msg *msg = list_first_entry(&rio->reserved_queue,
+ struct pcs_msg, list);
+ list_del_init(&msg->list);
+ return msg;
+}
+
+static void rio_msg_sent(struct pcs_rdmaio *rio, struct rio_tx *tx, struct pcs_msg *msg, int done)
+{
+ if (done) {
+ pcs_msg_sent(msg);
+ msg->done(msg);
+ } else {
+ tx->msg = msg;
+ list_add_tail(&tx->list, &rio->active_txs);
+ }
+}
+
+static struct rio_tx *rio_alloc_tx(struct pcs_rdma_device *dev,
+ int state)
+{
+ struct rio_tx *tx;
+
+ tx = RE_NULL(kzalloc(sizeof(struct rio_tx), GFP_NOIO));
+ if (!tx)
+ return NULL;
+
+ tx->buf = RE_NULL(ib_dma_alloc_coherent(dev->ib_dev, RIO_MSG_SIZE,
+ &tx->dma_addr,
+ GFP_NOIO | __GFP_NOWARN));
+ if (!tx->buf) {
+ kfree(tx);
+ return NULL;
+ }
+
+ tx->tx_state = state;
+
+ return tx;
+}
+
+static void rio_free_tx(struct pcs_rdma_device *dev, struct rio_tx *tx)
+{
+ ib_dma_free_coherent(dev->ib_dev, RIO_MSG_SIZE, tx->buf, tx->dma_addr);
+ kfree(tx);
+}
+
+static struct rio_tx *rio_get_tx(struct pcs_rdma_device *dev)
+{
+ struct rio_tx *tx;
+
+ if (list_empty(&dev->free_txs))
+ return NULL;
+
+ tx = list_first_entry(&dev->free_txs, struct rio_tx, list);
+ list_del(&tx->list);
+ dev->free_txs_cnt--;
+ BUG_ON(dev->free_txs_cnt < 0);
+
+ BUG_ON(tx->tx_state != TX_FREE);
+
+ tx->tx_state = TX_MSG_DONE;
+ tx->xid = 0;
+
+ return tx;
+}
+
+static void rio_put_tx(struct pcs_rdma_device *dev, struct rio_tx *tx)
+{
+ BUG_ON(tx->tx_state == TX_FREE);
+
+ if (tx->cleanup) {
+ tx->cleanup(tx);
+ tx->cleanup = NULL;
+ }
+ tx->msg = NULL;
+ tx->xid = 0;
+ tx->tx_state = TX_FREE;
+
+ list_add(&tx->list, &dev->free_txs);
+ dev->free_txs_cnt++;
+}
+
+enum {
+ SUBMIT_REGULAR,
+ SUBMIT_NOOP,
+ SUBMIT_RDMA_READ_ACK,
+};
+
+static inline void rio_cqe_init(struct rio_cqe *cqe, int ib_wr_count,
+ void (*done)(struct rio_cqe *cqe, bool sync_mode))
+{
+ cqe->status = IB_WC_SUCCESS;
+ cqe->ib_wr_count = ib_wr_count;
+ cqe->done = done;
+}
+
+static inline void rio_job_init(struct rio_job *job,
+ int (*work)(struct rio_job *job),
+ void (*destroy)(struct rio_job *job))
+{
+ INIT_LIST_HEAD(&job->list);
+ job->work = work;
+ job->destroy = destroy;
+}
+
+static inline void rio_post_tx_job(struct pcs_rdmaio *rio,
+ struct rio_job *job)
+{
+ list_add_tail(&job->list, &rio->tx_jobs);
+}
+
+static inline void rio_perform_tx_jobs(struct pcs_rdmaio *rio)
+{
+ struct rio_job *job, *tmp;
+
+ list_for_each_entry_safe(job, tmp, &rio->tx_jobs, list) {
+ if (job->work(job) == -EAGAIN)
+ break;
+ list_del(&job->list);
+ job->destroy(job);
+ }
+}
+
+static int rio_rx_post(struct pcs_rdmaio *rio, struct rio_rx *rx,
+ u32 length)
+{
+ int ret;
+
+ if (rio->rio_state == RIO_STATE_ABORTED)
+ return -ECONNABORTED;
+
+ rx->rio = rio;
+
+ rx->sge.addr = rx2dma(rio, rx);
+ rx->sge.length = length;
+ rx->sge.lkey = rio->dev->pd->local_dma_lkey;
+
+ memset(&rx->wr, 0, sizeof(rx->wr));
+ rx->wr.wr_id = (uintptr_t)&rx->cqe;
+ rx->wr.sg_list = &rx->sge;
+ rx->wr.num_sge = 1;
+
+ rio_cqe_init(&rx->cqe, 1, rio_rx_done);
+
+ ret = RE_INV(ib_post_recv(rio->cmid->qp, &rx->wr, NULL));
+ if (ret) {
+ TRACE("ib_post_recv failed: %d, rio: 0x%p\n", ret, rio);
+ } else {
+ rio->n_rx_posted++;
+ }
+
+ return ret;
+}
+
+static int rio_tx_post(struct pcs_rdmaio *rio, struct rio_tx *tx,
+ struct ib_send_wr *send_wr)
+{
+ struct ib_send_wr *wr;
+ int ib_wr_count = 0;
+ int ret;
+
+ if (rio->rio_state == RIO_STATE_ABORTED)
+ return -ECONNABORTED;
+
+ tx->rio = rio;
+
+ for (wr = send_wr; wr; wr = wr->next) {
+ BUG_ON(wr->wr_id);
+ BUG_ON(wr->send_flags & IB_SEND_SIGNALED);
+ if (wr->next) {
+ wr->wr_id = (uintptr_t)&tx->err_cqe;
+ wr->send_flags = 0;
+ } else {
+ wr->wr_id = (uintptr_t)&tx->cqe;
+ wr->send_flags = IB_SEND_SIGNALED;
+ }
+ ib_wr_count++;
+ }
+
+ rio_cqe_init(&tx->cqe, ib_wr_count, rio_tx_done);
+ rio_cqe_init(&tx->err_cqe, 0, rio_tx_err_occured);
+
+ if (rio->n_tx_posted + ib_wr_count > rio->max_send_wr) {
+ TRACE("ib send queue overflow: rio: 0x%p\n", rio);
+ return -ENOMEM;
+ }
+
+ ret = RE_INV(ib_post_send(rio->cmid->qp, send_wr, NULL));
+ if (ret) {
+ TRACE("ib_post_send failed: %d, rio: 0x%p\n", ret, rio);
+ } else {
+ rio->n_tx_posted += ib_wr_count;
+ }
+
+ return ret;
+}
+
+static int rio_tx_post_send(struct pcs_rdmaio *rio, struct rio_tx *tx,
+ u32 length, struct ib_send_wr *first_wr,
+ struct ib_send_wr *last_wr)
+{
+ struct ib_send_wr *send_wr;
+
+ tx->send.sge.addr = tx->dma_addr;
+ tx->send.sge.length = length;
+ tx->send.sge.lkey = rio->dev->pd->local_dma_lkey;
+
+ memset(&tx->send.wr, 0, sizeof(tx->send.wr));
+ tx->send.wr.opcode = IB_WR_SEND;
+ tx->send.wr.sg_list = &tx->send.sge;
+ tx->send.wr.num_sge = 1;
+
+ send_wr = &tx->send.wr;
+ if (first_wr && last_wr) {
+ last_wr->next = send_wr;
+ send_wr = first_wr;
+ }
+
+ return rio_tx_post(rio, tx, send_wr);
+}
+
+static int rio_tx_post_rdma_mr_read(struct pcs_rdmaio *rio, struct rio_tx *tx,
+ u64 remote_addr, u32 rkey, u32 length)
+{
+ struct ib_send_wr *send_wr;
+
+ tx->rdma_mr.sge.addr = tx->rdma_mr.msg.iova;
+ tx->rdma_mr.sge.length = length;
+ tx->rdma_mr.sge.lkey = tx->rdma_mr.msg.lkey;
+
+ memset(&tx->rdma_mr.wr, 0, sizeof(tx->rdma_mr.wr));
+ tx->rdma_mr.wr.wr.opcode = IB_WR_RDMA_READ;
+ tx->rdma_mr.wr.wr.sg_list = &tx->rdma_mr.sge;
+ tx->rdma_mr.wr.wr.num_sge = 1;
+ tx->rdma_mr.wr.remote_addr = remote_addr;
+ tx->rdma_mr.wr.rkey = rkey;
+
+ send_wr = &tx->rdma_mr.wr.wr;
+ if (tx->rdma_mr.msg.first_wr && tx->rdma_mr.msg.last_wr) {
+ tx->rdma_mr.msg.last_wr->next = send_wr;
+ send_wr = tx->rdma_mr.msg.first_wr;
+ }
+
+ return rio_tx_post(rio, tx, send_wr);
+}
+
+static int rio_tx_post_rdma_rw_read(struct pcs_rdmaio *rio, struct rio_tx *tx)
+{
+ if (!tx->rdma_rw.nr_wrs)
+ return -EINVAL;
+ return rio_tx_post(rio, tx, &tx->rdma_rw.wrs->wr);
+}
+
+static void rio_tx_cleanup_rdma_mr(struct rio_tx *tx)
+{
+ pcs_rdma_msg_destroy(&tx->rdma_mr.msg);
+}
+
+static void rio_tx_cleanup_rdma_rw(struct rio_tx *tx)
+{
+ pcs_rdma_rw_destroy(&tx->rdma_rw);
+}
+
+static int rio_submit_rdma_read(struct pcs_rdmaio *rio, struct pcs_msg *msg,
+ int offset, struct pcs_remote_buf *rb, bool allow_again)
+{
+ struct pcs_rdma_device *dev = rio->dev;
+ struct rio_tx *tx;
+
+ tx = RE_NULL(rio_get_tx(dev));
+ if (!tx) {
+ if (allow_again)
+ return -EAGAIN;
+ goto fail;
+ }
+
+ BUG_ON(!rb);
+
+ tx->tx_state = TX_SUBMIT_RDMA_READ_ACK;
+ tx->msg = msg;
+ tx->xid = rb->xid;
+
+ if (rdmaio_use_dma_mr_for_rdma_rw) {
+ if (pcs_rdma_rw_init_from_msg(&tx->rdma_rw, rio->cmid->device,
+ DMA_FROM_DEVICE, rb->rbuf, rb->rkey,
+ rio->dev->pd->local_dma_lkey, msg, offset,
+ offset + rb->rlen, GFP_NOIO, rio->cmid->qp->max_read_sge)) {
+ TRACE("pcs_rdma_rw_init_from_msg failed, try fallback: rio: 0x%p\n", rio);
+ goto fallback;
+ }
+ tx->cleanup = rio_tx_cleanup_rdma_rw;
+
+ if (rio_tx_post_rdma_rw_read(rio, tx)) {
+ TRACE("rio_tx_post_rdma_rw_read failed: rio: 0x%p\n", rio);
+ goto fail;
+ }
+ } else {
+fallback:
+ if (pcs_rdma_msg_init(&tx->rdma_mr.msg, msg, offset, offset + rb->rlen,
+ &dev->rd_mr_pool, rdmaio_use_map_for_mr)) {
+ TRACE("rio_rdma_mr_init failed: rio: 0x%p\n", rio);
+ goto fail;
+ }
+ tx->cleanup = rio_tx_cleanup_rdma_mr;
+
+ if (rio_tx_post_rdma_mr_read(rio, tx, rb->rbuf, rb->rkey, rb->rlen)) {
+ TRACE("rio_tx_post_rdma_mr_read failed: rio: 0x%p\n", rio);
+ goto fail;
+ }
+ }
+
+ return 0;
+
+fail:
+ if (tx)
+ rio_put_tx(dev, tx);
+ pcs_free_msg(msg);
+ rio_abort(rio, PCS_ERR_NET_ABORT);
+
+ return -EIO;
+}
+
+static int rio_rdma_read_job_work(struct rio_job *j)
+{
+ struct rio_rdma_read_job *job = container_of(j, struct rio_rdma_read_job, job);
+ struct pcs_rdmaio *rio = job->rio;
+
+ if (rio->rio_state != RIO_STATE_ESTABLISHED) {
+ pcs_free_msg(job->msg);
+ return 0;
+ }
+
+ return rio_submit_rdma_read(rio, job->msg, job->offset,
+ &job->rb, true);
+}
+
+static void rio_rdma_read_job_destroy(struct rio_job *j)
+{
+ struct rio_rdma_read_job *job = container_of(j, struct rio_rdma_read_job, job);
+ kfree(job);
+}
+
+static inline struct rio_rdma_read_job* rio_rdma_read_job_alloc(struct pcs_rdmaio *rio,
+ struct pcs_msg *msg,
+ int offset,
+ struct pcs_remote_buf *rb)
+{
+ struct rio_rdma_read_job *job;
+
+ job = RE_NULL(kzalloc(sizeof(struct rio_rdma_read_job), GFP_NOIO));
+ if (!job)
+ return NULL;
+
+ rio_job_init(&job->job, rio_rdma_read_job_work, rio_rdma_read_job_destroy);
+ job->rio = rio;
+ job->msg = msg;
+ job->offset = offset;
+ memcpy(&job->rb, rb, sizeof(job->rb));
+
+ return job;
+}
+
+static int msg_is_large(struct pcs_msg *msg)
+{
+ int hdr_len = sizeof(struct pcs_rdmaio_hdr);
+ return msg->size + hdr_len > RDMA_THRESHOLD;
+}
+
+static int rio_init_msg(char *buf, int payload_size, int credits, int submit_type,
+ struct pcs_remote_buf **rb, struct pcs_rdma_ack **rack)
+{
+ struct pcs_rdmaio_hdr *hdr = (struct pcs_rdmaio_hdr *)buf;
+ int hdr_len = sizeof(*hdr);
+ int type = RIO_MSG_IMMEDIATE;
+ int addon_len = 0;
+
+ switch (submit_type) {
+ case SUBMIT_NOOP:
+ type = RIO_MSG_NOOP;
+ break;
+ case SUBMIT_REGULAR:
+ if (hdr_len + payload_size > RDMA_THRESHOLD) {
+ type = RIO_MSG_RDMA_READ_REQ;
+ *rb = (struct pcs_remote_buf *)(buf + hdr_len);
+ addon_len = sizeof(struct pcs_remote_buf);
+ }
+ break;
+ case SUBMIT_RDMA_READ_ACK:
+ type = RIO_MSG_RDMA_READ_ACK;
+ *rack = (struct pcs_rdma_ack *)(buf + hdr_len);
+ addon_len = sizeof(struct pcs_rdma_ack);
+ break;
+ default:
+ BUG();
+ }
+
+ hdr->magic = RIO_MAGIC;
+ hdr->version = RIO_VERSION;
+ hdr->type = type;
+ hdr->size = hdr_len + addon_len;
+ hdr->credits = credits;
+
+ return hdr->size;
+}
+
+static void rio_update_msg_immediate(char *buf, int copied)
+{
+ struct pcs_rdmaio_hdr *hdr = (struct pcs_rdmaio_hdr *)buf;
+
+ hdr->size += copied;
+}
+
+static void rio_tx_cleanup_send(struct rio_tx *tx)
+{
+ pcs_rdma_msg_destroy(&tx->send.msg);
+}
+
+static int rio_submit(struct pcs_rdmaio *rio, struct pcs_msg *msg, int type, u64 xid, int status,
+ bool allow_again)
+{
+ struct pcs_rdma_device *dev = rio->dev;
+ struct rio_tx *tx;
+ struct ib_send_wr *first_tx_wr = NULL;
+ struct ib_send_wr *last_tx_wr = NULL;
+ int credits = rio->n_os_credits;
+ int msg_size = msg ? msg->size : 0;
+ struct pcs_remote_buf *rb = NULL;
+ struct pcs_rdma_ack *rack = NULL;
+ int hdr_len;
+ size_t tx_length;
+ char *payload;
+ int offset = 0;
+ struct iov_iter it;
+
+ tx = RE_NULL(rio_get_tx(dev));
+ if (!tx) {
+ if (allow_again)
+ return -EAGAIN;
+ goto fail;
+ }
+
+ hdr_len = rio_init_msg(tx->buf, msg_size, credits, type, &rb, &rack);
+ tx_length = hdr_len;
+ payload = tx->buf + hdr_len;
+
+ if (rack) {
+ rack->xid = xid;
+ rack->status = status;
+ } else if (rb) {
+ rio->xid_generator++;
+ rb->xid = tx->xid = rio->xid_generator;
+ tx->tx_state = TX_WAIT_FOR_TX_COMPL;
+ }
+
+ iov_iter_kvec(&it, WRITE, NULL, 0, 0);
+ while (offset < msg_size) {
+ size_t copy, res;
+
+ if (!iov_iter_count(&it))
+ msg->get_iter(msg, offset, &it, WRITE);
+
+ copy = iov_iter_count(&it);
+ if (copy > msg_size - offset)
+ copy = msg_size - offset;
+
+ if (hdr_len + offset + copy > RDMA_THRESHOLD) {
+ if (tx_length < hdr_len + rio->hdr_size) {
+ copy = RDMA_THRESHOLD - offset - hdr_len;
+ } else {
+ if (pcs_rdma_msg_init(&tx->send.msg, msg, offset, msg_size,
+ &dev->sd_mr_pool, rdmaio_use_map_for_mr)) {
+ TRACE("rio_rdma_mr_init failed: rio: 0x%p\n", rio);
+ goto fail;
+ }
+ tx->cleanup = rio_tx_cleanup_send;
+ first_tx_wr = tx->send.msg.first_wr;
+ last_tx_wr = tx->send.msg.last_wr;
+ rb->rbuf = tx->send.msg.iova;
+ rb->rkey = tx->send.msg.rkey;
+ rb->rlen = msg_size - offset;
+ break;
+ }
+ }
+
+ res = copy_from_iter(payload + offset, copy, &it);
+ BUG_ON(res != copy);
+ tx_length += copy;
+
+ offset += copy;
+ rio_update_msg_immediate(tx->buf, copy);
+ }
+
+ if (rio_tx_post_send(rio, tx, tx_length, first_tx_wr, last_tx_wr)) {
+ TRACE("rio_tx_post_send failed: rio: 0x%p\n", rio);
+ goto fail;
+ }
+
+ rio->n_os_credits -= credits;
+ if (msg) {
+ rio->n_peer_credits--;
+ if (rb)
+ rio->n_reserved_credits--;
+ BUG_ON(rio->n_peer_credits < 0);
+ BUG_ON(rio->n_reserved_credits < 0);
+
+ /*
+ * It's possible to see RX completion for response to this message
+ * *before* we see TX completion for this message. This will result
+ * in RPC's handle_response failing to find corresponding TX by xid.
+ *
+ * Thus, we shouldn't wait for TX completion to tell upper layer that
+ * the message has been sent and do it right after
+ * rio_tx_post_send completes (similar to TCP). If
+ * rio_tx_post_send() fails eventually, we will receive TX
+ * completion with an error flag and cancel all
+ * outstanding/pending RPC requests. So we are not going to
+ * lose an error.
+ *
+ * But, if the message is big enough to trigger RDMA READ
+ * transfer, we are going to call ->done() callback after we
+ * receive RDMA_READ_ACK message from our peer. Since messages
+ * in a single RX queue are guaranteed to come in order, there
+ * is no race in this case.
+ */
+ rio_msg_sent(rio, tx, msg, rb == NULL);
+ }
+
+ return 0;
+
+fail:
+ if (tx)
+ rio_put_tx(dev, tx);
+ if (msg)
+ list_add(&msg->list, &rio->write_queue);
+ rio_abort(rio, PCS_ERR_NET_ABORT);
+
+ return -EIO;
+}
+
+static inline void rio_enable_kick(struct pcs_rdmaio *rio)
+{
+ rio->no_kick--;
+ BUG_ON(rio->no_kick < 0);
+}
+
+static inline void rio_disable_kick(struct pcs_rdmaio *rio)
+{
+ rio->no_kick++;
+}
+
+static inline void rio_kick_write_queue(struct pcs_rdmaio *rio)
+{
+ if (rio->no_kick)
+ return;
+ rio_disable_kick(rio);
+
+ rio_perform_tx_jobs(rio);
+
+ /* Main loop sending large messages from reserved_queue */
+ while (rio->rio_state == RIO_STATE_ESTABLISHED &&
+ rio->dev->free_txs_cnt > rio->queue_depth &&
+ !list_empty(&rio->reserved_queue) && rio->n_peer_credits &&
+ rio->n_reserved_credits) {
+ struct pcs_msg *msg = rio_dequeue_reserved_msg(rio);
+ if (rio_submit(rio, msg, SUBMIT_REGULAR, 0, 0, true) == -EAGAIN) {
+ list_add(&msg->list, &rio->reserved_queue);
+ break;
+ }
+ }
+
+ /* Main loop sending ordinary messages from write_queue */
+ while (rio->rio_state == RIO_STATE_ESTABLISHED &&
+ rio->dev->free_txs_cnt > rio->queue_depth &&
+ !list_empty(&rio->write_queue) && rio->n_peer_credits) {
+ struct pcs_msg *msg = rio_dequeue_msg(rio);
+
+ if (!rio->n_reserved_credits && msg_is_large(msg)) {
+ list_add_tail(&msg->list, &rio->reserved_queue);
+ } else if (rio_submit(rio, msg, SUBMIT_REGULAR, 0, 0, true) == -EAGAIN) {
+ list_add(&msg->list, &rio->write_queue);
+ break;
+ }
+ }
+
+ /* Return credits by NOOP only if we have many enough to return AND
+ * we cannot piggyback it by sending a message from write_queue */
+ if (rio->rio_state == RIO_STATE_ESTABLISHED &&
+ rio->dev->free_txs_cnt > rio->queue_depth &&
+ rio->n_os_credits >= rio->n_th_credits)
+ rio_submit(rio, NULL, SUBMIT_NOOP, 0, 0, true);
+
+ rio_enable_kick(rio);
+}
+
+static void rio_handle_tx(struct pcs_rdmaio *rio, struct rio_tx *tx, int ok)
+{
+ struct pcs_msg *msg = tx->msg;
+ u64 xid = tx->xid;
+
+ /* override remote success if we already aborted */
+ if (rio->rio_state == RIO_STATE_ABORTED)
+ ok = 0;
+
+ if (!ok)
+ rio_abort(rio, PCS_ERR_NET_ABORT);
+
+ switch (tx->tx_state) {
+ case TX_SUBMIT_RDMA_READ_ACK:
+ rio_put_tx(rio->dev, tx);
+ rio_submit(rio, NULL, SUBMIT_RDMA_READ_ACK, xid, !ok, false);
+ break;
+ case TX_WAIT_FOR_TX_COMPL:
+ case TX_WAIT_FOR_READ_ACK:
+ if (++tx->tx_state != TX_MSG_DONE)
+ return;
+ case TX_MSG_DONE:
+ rio_put_tx(rio->dev, tx);
+ break;
+ default:
+ BUG();
+ }
+
+ if (msg) {
+ if (!ok)
+ pcs_set_local_error(&msg->error, PCS_ERR_NET_ABORT);
+
+ rio_msg_sent(rio, NULL, msg, 1);
+ }
+}
+
+/*
+ * rio wire header is already stripped, buf points to payload data (pcs_rpc hdr)
+ */
+static int rio_handle_rx_immediate(struct pcs_rdmaio *rio, char *buf, int len,
+ struct pcs_remote_buf *rb, int *throttle)
+{
+ struct pcs_msg *msg;
+ u32 msg_size;
+ int offset = rio->hdr_size;
+ struct iov_iter it;
+
+ if (len < rio->hdr_size) {
+ TRACE("rio read short msg: %d < %d, rio: 0x%p\n", len,
+ rio->hdr_size, rio);
+ return PCS_ERR_NET_ABORT;
+ }
+
+ msg = rio->netio.getmsg(&rio->netio, buf, &msg_size);
+ if (msg == NULL) {
+ int err = 0;
+ if (rio->throttled)
+ *throttle = 1;
+ else
+ err = PCS_ERR_NOMEM;
+ return err;
+ } else if (msg == PCS_TRASH_MSG) {
+ TRACE("rio drop trash msg: %u, rio: 0x%p\n", msg_size, rio);
+ return 0;
+ }
+
+ if (msg->size != len + (rb ? rb->rlen : 0)) {
+ TRACE("rio read wrong len: %d != %d (%llx/%x/%d), rio: 0x%p",
+ len, msg->size, rb ? rb->rbuf : 0ULL,
+ rb ? rb->rkey : 0, rb ? rb->rlen : -1,
+ rio);
+ pcs_free_msg(msg);
+ return PCS_ERR_NET_ABORT;
+ }
+
+ iov_iter_kvec(&it, READ, NULL, 0, 0);
+ while (offset < len) {
+ size_t body_len, res;
+
+ if (!iov_iter_count(&it))
+ msg->get_iter(msg, offset, &it, READ);
+
+ body_len = iov_iter_count(&it);
+ if (body_len > len - offset)
+ body_len = len - offset;
+
+ res = copy_to_iter(buf + offset, body_len, &it);
+ BUG_ON(res != body_len);
+
+ offset += body_len;
+ }
+
+ if (len == msg->size) {
+ msg->done(msg);
+ } else if (rio_submit_rdma_read(rio, msg, offset, rb, true) == -EAGAIN) {
+ struct rio_rdma_read_job *job;
+ job = rio_rdma_read_job_alloc(rio, msg, offset, rb);
+ if (!job)
+ rio_submit_rdma_read(rio, msg, offset, rb, false);
+ else
+ rio_post_tx_job(rio, &job->job);
+ }
+
+ return 0;
+}
+
+static int rio_handle_rx_read_ack(struct pcs_rdmaio *rio,
+ struct pcs_rdma_ack *rack)
+{
+ struct rio_tx *tx;
+
+ list_for_each_entry(tx, &rio->active_txs, list)
+ if (tx->xid == rack->xid) {
+ list_del(&tx->list);
+ rio_handle_tx(rio, tx, !rack->status);
+ return 0;
+ }
+
+ return PCS_ERR_NET_ABORT;
+}
+
+/*
+ * When we see RX coming from the wire very first time, flag "pended" is
+ * false and we naturally update n_rx_posted and n_peer_credits.
+ *
+ * Later on, due to throttling, the RX may reside in pended_rxs for a while.
+ * Then, handling unthrottle event, we will see this RX again, the "pended"
+ * flag is true. This means we should not touch n_rx_posted and
+ * n_peer_credits again.
+ */
+static void rio_handle_rx(struct pcs_rdmaio *rio, struct rio_rx *rx,
+ enum ib_wc_status status, int pended)
+{
+ char *buf = rx2buf(rio, rx);
+ int ok = (status == IB_WC_SUCCESS) &&
+ (rio->rio_state == RIO_STATE_ESTABLISHED);
+ char *payload = NULL;
+ int payload_size = 0;
+ int credits = 0;
+ int throttle = 0;
+ int type;
+ int err = PCS_ERR_NET_ABORT;
+ struct pcs_remote_buf *rb = NULL;
+ struct pcs_rdma_ack *rack = NULL;
+
+ if (!ok) {
+ rio_abort(rio, PCS_ERR_NET_ABORT);
+ return;
+ }
+
+ type = rio_parse_hdr(buf, &payload, &payload_size, &credits, &rb, &rack,
+ rio->queue_depth);
+
+ switch (type) {
+ case RIO_MSG_IMMEDIATE:
+ case RIO_MSG_RDMA_READ_REQ:
+ err = rio_handle_rx_immediate(rio, payload, payload_size, rb, &throttle);
+ if (err)
+ goto do_abort;
+ break;
+ case RIO_MSG_NOOP:
+ /* for now, it only returns credits */
+ break;
+ case RIO_MSG_RDMA_READ_ACK:
+ BUG_ON(!rack);
+ err = rio_handle_rx_read_ack(rio, rack);
+ if (err)
+ goto do_abort;
+ break;
+ default:
+ goto do_abort;
+ }
+
+ if (!throttle) {
+ if (rio_rx_post(rio, rx, RIO_MSG_SIZE)) {
+ TRACE("rio_rx_post failed: rio: 0x%p\n", rio);
+ rio_abort(rio, PCS_ERR_NET_ABORT);
+ return;
+ }
+
+ if (type != RIO_MSG_NOOP &&
+ type != RIO_MSG_RDMA_READ_ACK)
+ rio->n_os_credits++;
+
+ if (type == RIO_MSG_RDMA_READ_ACK)
+ rio->n_reserved_credits++;
+
+ BUG_ON(rio->n_reserved_credits > rio->queue_depth);
+ if (rio->n_os_credits > rio->queue_depth) {
+ TRACE("n_os_credits overflow: rio: 0x%p\n", rio);
+ rio_abort(rio, PCS_ERR_NET_ABORT);
+ return;
+ }
+ } else
+ list_add(&rx->list, &rio->pended_rxs);
+
+ if (!pended)
+ rio->n_peer_credits += credits;
+
+ return;
+
+do_abort:
+ rio_abort(rio, err);
+}
+
+static void rio_handle_pended_rxs(struct pcs_rdmaio *rio)
+{
+ LIST_HEAD(local);
+
+ list_splice_init(&rio->pended_rxs, &local);
+
+ while (!list_empty(&local)) {
+ struct rio_rx *rx;
+
+ rx = list_first_entry(&local, struct rio_rx, list);
+ list_del(&rx->list);
+
+ rio_handle_rx(rio, rx, IB_WC_SUCCESS, 1);
+ }
+}
+
+static void rio_rx_done(struct rio_cqe *cqe, bool sync_mode)
+{
+ struct rio_rx *rx = container_of(cqe, struct rio_rx, cqe);
+ struct pcs_rdmaio *rio = rx->rio;
+
+ rio->n_rx_posted -= cqe->ib_wr_count;
+ BUG_ON(rio->n_rx_posted < 0);
+
+ if (sync_mode) {
+ if (!rio_rx_post(rio, rx, RIO_MSG_SIZE))
+ rio->n_os_credits++;
+ } else {
+ rio_handle_rx(rio, rx, cqe->status, 0);
+ }
+}
+
+static void rio_tx_err_occured(struct rio_cqe *cqe, bool sync_mode)
+{
+ TRACE("status: %d\n", cqe->status);
+}
+
+static void rio_tx_done(struct rio_cqe *cqe, bool sync_mode)
+{
+ struct rio_tx *tx = container_of(cqe, struct rio_tx, cqe);
+ struct pcs_rdmaio *rio = tx->rio;
+
+ if (cqe->status == IB_WC_SUCCESS)
+ cqe->status = tx->err_cqe.status;
+
+ rio->n_tx_posted -= cqe->ib_wr_count;
+ BUG_ON(rio->n_tx_posted < 0);
+
+ if (sync_mode)
+ rio_put_tx(rio->dev, tx);
+ else
+ rio_handle_tx(rio, tx, cqe->status == IB_WC_SUCCESS);
+}
+
+static inline struct rio_cqe* rio_poll_cq(struct pcs_rdmaio *rio)
+{
+ struct rio_cqe *cqe = NULL;
+
+ if (rio->wc_idx >= rio->wc_cnt) {
+ rio->wc_cnt = ib_poll_cq(rio->cq, ARRAY_SIZE(rio->wcs),
+ rio->wcs);
+ rio->wc_idx = 0;
+ }
+
+ if (rio->wc_idx < rio->wc_cnt) {
+ cqe = (void*)rio->wcs[rio->wc_idx].wr_id;
+ if (cqe->status == IB_WC_SUCCESS &&
+ rio->wcs[rio->wc_idx].status != IB_WC_SUCCESS)
+ cqe->status = rio->wcs[rio->wc_idx].status;
+ rio->wc_idx++;
+ }
+
+ return cqe;
+}
+
+static inline int rio_req_notify_cq(struct pcs_rdmaio *rio)
+{
+ return ib_req_notify_cq(rio->cq, IB_CQ_NEXT_COMP |
+ IB_CQ_REPORT_MISSED_EVENTS);
+}
+
+static void pcs_rdma_cq_comp_handler(struct ib_cq *cq, void *private)
+{
+ struct pcs_rdmaio *rio = private;
+ struct pcs_rpc *ep = rio->netio.parent;
+
+ set_bit(PCS_RDMA_IO_CQE, &rio->io_flags);
+ wake_up(&rio->waitq);
+ pcs_rpc_kick_queue(ep);
+}
+
+static inline int rio_comp_perform(struct pcs_rdmaio *rio)
+{
+ struct rio_cqe *cqe;
+ int count = 0;
+
+ while ((cqe = rio_poll_cq(rio))) {
+ rio_disable_kick(rio);
+ cqe->done(cqe, false);
+ rio_enable_kick(rio);
+
+ rio_kick_write_queue(rio);
+ count++;
+ }
+
+ return count;
+}
+
+static void pcs_rdma_cq_event_handler(struct ib_event *event, void *private)
+{
+ struct pcs_rdmaio *rio = private;
+ TRACE("rio: 0x%p\n", rio);
+}
+
+static int pcs_rdma_io_event_handler(struct rdma_cm_id *cmid,
+ struct rdma_cm_event *event)
+{
+ struct pcs_rdma_id *id = cmid->context;
+ struct pcs_rdmaio *rio = container_of(id, struct pcs_rdmaio, id);
+ struct pcs_rpc *ep = rio->netio.parent;
+
+ TRACE("rio: 0x%p, event: %d, status: %d\n", rio, event->event, event->status);
+
+ set_bit(PCS_RDMA_IO_ERROR, &rio->io_flags);
+ pcs_rpc_kick_queue(ep);
+
+ return 0;
+}
+
+static void pcs_rdma_qp_event_handler(struct ib_event *event, void *context)
+{
+ struct pcs_rdmaio *rio = context;
+ TRACE("rio: 0x%p, event: %d\n", rio, event->event);
+}
+
+static struct pcs_rdma_device *pcs_rdma_device_create(struct rdma_cm_id *cmid,
+ int queue_depth,
+ int send_queue_depth)
+{
+ struct pcs_rdma_device *dev;
+ struct rio_tx *tx;
+ u32 max_num_sg = min_t(u32, RDMA_MAX_SEGMENTS,
+ cmid->device->attrs.max_fast_reg_page_list_len);
+ int i;
+
+ dev = RE_NULL(kzalloc(sizeof(*dev), GFP_NOIO));
+ if (!dev)
+ return NULL;
+
+ dev->ib_dev = cmid->device;
+
+ INIT_LIST_HEAD(&dev->free_txs);
+ for (i = 0; i < send_queue_depth; i++) {
+ tx = rio_alloc_tx(dev, TX_MSG_DONE);
+ if (!tx)
+ goto free_bufs;
+ rio_put_tx(dev, tx);
+ }
+
+ dev->pd = RE_PTR_INV(ib_alloc_pd(dev->ib_dev, 0));
+ if (IS_ERR(dev->pd)) {
+ TRACE("ib_alloc_pd failed: dev: 0x%p\n", dev);
+ goto free_bufs;
+ }
+
+ if (pcs_ib_mr_pool_init(&dev->ib_mr_pool, dev->pd, IB_MR_TYPE_MEM_REG,
+ max_num_sg,
+ queue_depth * 2)) {
+ TRACE("pcs_ib_mr_pool_init failed: dev: 0x%p\n", dev);
+ goto free_pd;
+ }
+
+ if (pcs_rdma_mr_pool_init(&dev->sd_mr_pool, RDMA_MAX_MSG_PAYLOAD,
+ queue_depth, dev->ib_dev, dev->pd, DMA_TO_DEVICE,
+ GFP_NOIO, &dev->ib_mr_pool)) {
+ TRACE("pcs_rdma_mr_pool_init failed: dev: 0x%p\n", dev);
+ goto free_ib_mr;
+ }
+
+ if (pcs_rdma_mr_pool_init(&dev->rd_mr_pool, RDMA_MAX_MSG_PAYLOAD,
+ queue_depth, dev->ib_dev, dev->pd, DMA_FROM_DEVICE,
+ GFP_NOIO, &dev->ib_mr_pool)) {
+ TRACE("pcs_rdma_mr_pool_init failed: dev: 0x%p\n", dev);
+ goto free_sd_mr;
+ }
+
+ return dev;
+
+free_sd_mr:
+ pcs_rdma_mr_pool_destroy(&dev->sd_mr_pool);
+free_ib_mr:
+ pcs_ib_mr_pool_destroy(&dev->ib_mr_pool);
+free_pd:
+ ib_dealloc_pd(dev->pd);
+free_bufs:
+ while ((tx = rio_get_tx(dev)))
+ rio_free_tx(dev, tx);
+ kfree(dev);
+ return NULL;
+}
+
+static void pcs_rdma_device_destroy(struct pcs_rdma_device *dev)
+{
+ struct rio_tx *tx;
+
+ pcs_rdma_mr_pool_destroy(&dev->rd_mr_pool);
+ pcs_rdma_mr_pool_destroy(&dev->sd_mr_pool);
+ pcs_ib_mr_pool_destroy(&dev->ib_mr_pool);
+
+ ib_dealloc_pd(dev->pd);
+
+ while ((tx = rio_get_tx(dev)))
+ rio_free_tx(dev, tx);
+
+ kfree(dev);
+}
+
+struct pcs_rdmaio* pcs_rdma_create(int hdr_size, struct rdma_cm_id *cmid,
+ int queue_depth, struct pcs_rpc *ep)
+{
+ struct pcs_rdmaio *rio;
+ struct rio_rx *rx;
+ struct ib_cq_init_attr cq_attr = {};
+ struct ib_qp_init_attr qp_attr = {};
+ int recv_queue_depth = queue_depth * 2 + 2;
+ int send_queue_depth = queue_depth * 4 + 4;
+ int rx_descs_siz = recv_queue_depth * sizeof(struct rio_rx);
+ int rx_bufs_siz = recv_queue_depth * RIO_MSG_SIZE;
+ static atomic_t comp_vector = ATOMIC_INIT(-1);
+ unsigned int cq_count = rdmaio_cq_count;
+ unsigned int cq_period = rdmaio_cq_period;
+ int max_recv_wr;
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ BUILD_BUG_ON((TX_WAIT_FOR_READ_ACK - TX_WAIT_FOR_TX_COMPL) != 1);
+ BUILD_BUG_ON((TX_MSG_DONE - TX_WAIT_FOR_READ_ACK) != 1);
+
+ if (queue_depth < RIO_QUEUE_DEPTH)
+ queue_depth = RIO_QUEUE_DEPTH;
+ else if (queue_depth > RIO_MAX_QUEUE_DEPTH)
+ queue_depth = RIO_MAX_QUEUE_DEPTH;
+
+ rio = RE_NULL(kzalloc(sizeof(struct pcs_rdmaio), GFP_NOIO));
+ if (!rio)
+ return NULL;
+
+ rio->netio.parent = pcs_rpc_get(ep);
+ rio->id.event_handler = pcs_rdma_io_event_handler;
+
+ init_waitqueue_head(&rio->waitq);
+
+ rio->rio_state = RIO_STATE_CONNECTING;
+ rio->rio_error = PCS_ERR_NET_ABORT;
+
+ rio->hdr_size = hdr_size;
+ rio->queue_depth = queue_depth;
+ rio->send_queue_depth = send_queue_depth;
+
+ INIT_LIST_HEAD(&rio->tx_jobs);
+ INIT_LIST_HEAD(&rio->pended_rxs);
+
+ rio->n_peer_credits = queue_depth;
+ rio->n_reserved_credits = queue_depth;
+ rio->n_os_credits = 0;
+ rio->n_th_credits = queue_depth / 2;
+
+ rio->cmid = cmid;
+
+ INIT_LIST_HEAD(&rio->write_queue);
+ INIT_LIST_HEAD(&rio->reserved_queue);
+
+ rio->no_kick = 0;
+ rio->throttled = 0;
+
+ INIT_LIST_HEAD(&rio->active_txs);
+
+ rio->xid_generator = 0;
+
+ rio->conn_req.magic = RIO_MAGIC;
+ rio->conn_req.version = RIO_VERSION;
+ rio->conn_req.queue_depth = queue_depth;
+ rio->conn_req.msg_size = RIO_MSG_SIZE;
+
+ rio->rx_descs = RE_NULL(kzalloc(rx_descs_siz, GFP_NOIO | __GFP_NOWARN));
+ if (!rio->rx_descs)
+ goto free_rio;
+
+ rio->rx_bufs_size = rx_bufs_siz;
+ rio->rx_bufs = RE_NULL(ib_dma_alloc_coherent(rio->cmid->device, rio->rx_bufs_size,
+ &rio->rx_bufs_dma,
+ GFP_NOIO | __GFP_NOWARN));
+ if (!rio->rx_bufs) {
+ TRACE("ib_dma_alloc_coherent failed: rio: 0x%p\n", rio);
+ goto free_desc;
+ }
+
+ rio->dev = pcs_rdma_device_create(rio->cmid, queue_depth, send_queue_depth);
+ if (!rio->dev) {
+ TRACE("pcs_rdma_device_create failed: rio: 0x%p\n", rio);
+ goto free_bufs;
+ }
+
+ max_recv_wr = recv_queue_depth;
+ rio->max_send_wr = max_t(int, send_queue_depth * 4,
+ DIV_ROUND_UP(send_queue_depth * (RDMA_MAX_MSG_PAYLOAD >> PAGE_SHIFT),
+ rio->cmid->device->attrs.max_send_sge));
+
+ cq_attr.cqe = max_recv_wr + rio->max_send_wr;
+ cq_attr.comp_vector = (unsigned int)atomic_inc_return(&comp_vector) %
+ rio->cmid->device->num_comp_vectors;
+ rio->cq = RE_PTR_INV(ib_create_cq(rio->cmid->device, pcs_rdma_cq_comp_handler,
+ pcs_rdma_cq_event_handler, rio, &cq_attr));
+ if (IS_ERR(rio->cq)) {
+ TRACE("ib_alloc_cq failed: rio: 0x%p\n", rio);
+ goto free_dev;
+ }
+ if (cq_count && cq_period) {
+ int ret = rdma_set_cq_moderation(rio->cq, cq_count, cq_period);
+ TRACE("rio: 0x%p, set cq moderation: cq_count %u, cq_period: %u, ret: %d\n",
+ rio, cq_count, cq_period, ret);
+ }
+ ib_req_notify_cq(rio->cq, IB_CQ_NEXT_COMP);
+
+ qp_attr.event_handler = pcs_rdma_qp_event_handler;
+ qp_attr.qp_context = rio;
+ qp_attr.cap.max_send_wr = rio->max_send_wr;
+ qp_attr.cap.max_send_sge = rio->cmid->device->attrs.max_send_sge;
+ qp_attr.cap.max_recv_wr = max_recv_wr;
+ qp_attr.cap.max_recv_sge = 1;
+ qp_attr.send_cq = rio->cq;
+ qp_attr.recv_cq = rio->cq;
+ qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ qp_attr.qp_type = IB_QPT_RC;
+
+ TRACE("rio: 0x%p, max_send wr/sge: %u/%u, max_recv wr/sge: %u/%u\n",
+ rio, qp_attr.cap.max_send_wr, qp_attr.cap.max_send_sge,
+ qp_attr.cap.max_recv_wr, qp_attr.cap.max_recv_sge);
+ if (RE_INV(rdma_create_qp(rio->cmid, rio->dev->pd, &qp_attr))) {
+ TRACE("rdma_create_qp failed: rio: 0x%p\n", rio);
+ goto free_cq;
+ }
+
+ for (rx = rio->rx_descs; rx - rio->rx_descs < recv_queue_depth; rx++)
+ if (rio_rx_post(rio, rx, RIO_MSG_SIZE)) {
+ TRACE("rio_rx_post failed: rio: 0x%p\n", rio);
+ break;
+ }
+
+ if (rio->n_rx_posted != recv_queue_depth)
+ goto free_qp;
+
+ TRACE("rio: 0x%p, dev: 0x%p, queue_depth: %d\n", rio, rio->dev, queue_depth);
+
+ return rio;
+
+free_qp:
+ rdma_destroy_qp(rio->cmid);
+free_cq:
+ ib_destroy_cq(rio->cq);
+free_dev:
+ pcs_rdma_device_destroy(rio->dev);
+free_bufs:
+ ib_dma_free_coherent(rio->cmid->device, rio->rx_bufs_size,
+ rio->rx_bufs, rio->rx_bufs_dma);
+free_desc:
+ kfree(rio->rx_descs);
+free_rio:
+ pcs_rpc_put(rio->netio.parent);
+ kfree(rio);
+ return NULL;
+}
+
+static void rio_cleanup(struct pcs_rdmaio *rio)
+{
+ rio_perform_tx_jobs(rio);
+
+ while (!list_empty(&rio->write_queue)) {
+ struct pcs_msg * msg = rio_dequeue_msg(rio);
+
+ pcs_msg_sent(msg);
+ pcs_set_local_error(&msg->error, rio->rio_error);
+ msg->done(msg);
+ }
+
+ while (!list_empty(&rio->reserved_queue)) {
+ struct pcs_msg * msg = rio_dequeue_reserved_msg(rio);
+
+ pcs_msg_sent(msg);
+ pcs_set_local_error(&msg->error, rio->rio_error);
+ msg->done(msg);
+ }
+
+ while (!list_empty(&rio->active_txs)) {
+ struct rio_tx *tx = list_first_entry(&rio->active_txs,
+ struct rio_tx, list);
+ struct pcs_msg *msg = tx->msg;
+ list_del(&tx->list);
+
+ BUG_ON(!msg);
+ rio_put_tx(rio->dev, tx);
+ pcs_set_local_error(&msg->error, rio->rio_error);
+ rio_msg_sent(rio, NULL, msg, 1);
+ }
+}
+
+static void rio_abort(struct pcs_rdmaio *rio, int error)
+{
+ struct pcs_netio *netio = &rio->netio;
+ struct ib_qp_attr qp_attr = { .qp_state = IB_QPS_ERR };
+
+ if (rio->rio_state == RIO_STATE_ABORTED) /* already handled */
+ return;
+
+ rio->rio_state = RIO_STATE_ABORTED;
+ rio->rio_error = error;
+
+ if (rdma_disconnect(rio->cmid))
+ TRACE("rdma_disconnect failed: rio: 0x%p\n", rio);
+
+ if (ib_modify_qp(rio->cmid->qp, &qp_attr, IB_QP_STATE))
+ TRACE("ib_modify_qp failed: rio: 0x%p\n", rio);
+
+ if (netio->eof) {
+ void (*eof)(struct pcs_netio *) = netio->eof;
+ netio->eof = NULL;
+ (*eof)(netio);
+ }
+}
+
+static LLIST_HEAD(rio_destroy_list);
+
+static void rio_destroy(struct work_struct *work)
+{
+ struct llist_node *list = llist_del_all(&rio_destroy_list);
+ struct pcs_rdmaio *rio, *tmp;
+
+ if (unlikely(!list))
+ return;
+
+ llist_for_each_entry_safe(rio, tmp, list, destroy_node) {
+ struct pcs_rpc *ep = rio->netio.parent;
+
+ mutex_lock(&ep->mutex);
+
+ TRACE("rio: 0x%p\n", rio);
+
+ while (rio->n_rx_posted || rio->n_tx_posted) {
+ rio_req_notify_cq(rio);
+ wait_event_timeout(rio->waitq, rio_comp_perform(rio),
+ ep->params.response_timeout);
+ }
+ rio_cleanup(rio);
+
+ rdma_destroy_qp(rio->cmid);
+ ib_destroy_cq(rio->cq);
+
+ BUG_ON(!list_empty(&rio->tx_jobs));
+ BUG_ON(!list_empty(&rio->write_queue));
+ BUG_ON(!list_empty(&rio->reserved_queue));
+ BUG_ON(!list_empty(&rio->active_txs));
+ BUG_ON(rio->dev->free_txs_cnt != rio->send_queue_depth);
+
+ pcs_rdma_device_destroy(rio->dev);
+ ib_dma_free_coherent(rio->cmid->device, rio->rx_bufs_size,
+ rio->rx_bufs, rio->rx_bufs_dma);
+ kfree(rio->rx_descs);
+
+ rdma_destroy_id(rio->cmid);
+
+ memset(rio, 0xFF, sizeof(*rio));
+ kfree(rio);
+
+ mutex_unlock(&ep->mutex);
+
+ pcs_rpc_put(ep);
+ }
+}
+
+static DECLARE_WORK(rio_destroy_work, rio_destroy);
+
+void pcs_rdma_destroy(struct pcs_rdmaio *rio)
+{
+ struct pcs_netio *netio = &rio->netio;
+ struct pcs_rpc *ep = netio->parent;
+
+ TRACE("rio: 0x%p\n", rio);
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ netio->eof = NULL;
+ rio_abort(rio, PCS_ERR_NET_ABORT);
+
+ if (llist_add(&rio->destroy_node, &rio_destroy_list))
+ queue_work(pcs_cleanup_wq, &rio_destroy_work);
+}
+
+void pcs_rdma_ioconn_destruct(struct pcs_ioconn *ioconn)
+{
+ struct pcs_rdmaio *rio = rio_from_ioconn(ioconn);
+ struct pcs_rpc *ep = rio->netio.parent;
+
+ TRACE("rio: 0x%p\n", rio);
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ BUG_ON(rio->rio_state != RIO_STATE_ABORTED);
+ if (llist_add(&rio->destroy_node, &rio_destroy_list))
+ queue_work(pcs_cleanup_wq, &rio_destroy_work);
+}
+
+static void pcs_rdma_throttle(struct pcs_netio *netio)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+
+ TRACE("rio: 0x%p\n", rio);
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (rio->throttled || rio->rio_state != RIO_STATE_ESTABLISHED)
+ return;
+
+ rio->throttled = 1;
+}
+
+static void pcs_rdma_unthrottle(struct pcs_netio *netio)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+
+ TRACE("rio: 0x%p\n", rio);
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (!rio->throttled || rio->rio_state != RIO_STATE_ESTABLISHED)
+ return;
+
+ rio->throttled = 0;
+
+ if (!list_empty(&rio->pended_rxs))
+ rio_handle_pended_rxs(rio);
+}
+
+static void pcs_rdma_send_msg(struct pcs_netio *netio, struct pcs_msg *msg)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (rio->rio_state != RIO_STATE_ESTABLISHED) {
+ pcs_msg_sent(msg);
+ pcs_set_local_error(&msg->error, rio->rio_error);
+ msg->done(msg);
+ return;
+ }
+
+ msg->netio = netio;
+
+ list_add_tail(&msg->list, &rio->write_queue);
+ msg->start_time = jiffies;
+ msg->stage = PCS_MSG_STAGE_SEND;
+
+ rio_kick_write_queue(rio);
+}
+
+static int pcs_rdma_cancel_msg(struct pcs_msg *msg)
+{
+ struct pcs_rpc *ep = msg->netio->parent;
+
+ TRACE("msg: 0x%p\n", msg);
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (list_empty(&msg->list))
+ return -EBUSY;
+
+ list_del_init(&msg->list);
+ msg->stage = PCS_MSG_STAGE_SENT;
+
+ return 0;
+}
+
+static void pcs_rdma_abort_io(struct pcs_netio *netio, int error)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+
+ TRACE("rio: 0x%p\n", rio);
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (rio->rio_state != RIO_STATE_ESTABLISHED)
+ return;
+
+ netio->eof = NULL;
+ rio_abort(rio, error);
+}
+
+static void pcs_rdma_xmit(struct pcs_netio *netio)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (rio->rio_state != RIO_STATE_ESTABLISHED)
+ return;
+
+ if (test_bit(PCS_RDMA_IO_ERROR, &rio->io_flags))
+ rio_abort(rio, PCS_ERR_NET_ABORT);
+
+ while (test_and_clear_bit(PCS_RDMA_IO_CQE, &rio->io_flags)) {
+ do {
+ rio_comp_perform(rio);
+ } while (rio_req_notify_cq(rio) > 0);
+ }
+
+ rio_kick_write_queue(rio);
+}
+
+static int pcs_rdma_flush(struct pcs_netio *netio)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (rio->rio_state != RIO_STATE_ESTABLISHED)
+ return 0;
+
+ return test_bit(PCS_RDMA_IO_CQE, &rio->io_flags);
+}
+
+static unsigned long pcs_rdma_next_timeout(struct pcs_netio *netio)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+ struct pcs_msg *msg;
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (list_empty(&rio->write_queue))
+ return 0;
+
+ msg = list_first_entry(&rio->write_queue, struct pcs_msg, list);
+ return msg->start_time + rio->send_timeout;
+}
+
+static int pcs_rdma_sync_send(struct pcs_netio *netio, struct pcs_msg *msg)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+ int credits = rio->n_os_credits;
+ struct pcs_remote_buf *rb;
+ struct pcs_rdma_ack *rack;
+ struct rio_tx *tx;
+ struct rio_cqe *cqe;
+ int hdr_len;
+ int ret;
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (rio->rio_state != RIO_STATE_ESTABLISHED ||
+ !rio->n_peer_credits)
+ return -EINVAL;
+
+ tx = RE_NULL(rio_get_tx(rio->dev));
+ if (!tx)
+ return -ENOMEM;
+
+ hdr_len = rio_init_msg(tx->buf, msg->size, credits, SUBMIT_REGULAR, &rb, &rack);
+ if (hdr_len + msg->size > RDMA_THRESHOLD) {
+ rio_put_tx(rio->dev, tx);
+ return -EINVAL;
+ }
+
+ memcpy(tx->buf + hdr_len, msg->_inline_buffer, msg->size);
+ rio_update_msg_immediate(tx->buf, msg->size);
+
+ ret = rio_tx_post_send(rio, tx, hdr_len + msg->size,
+ NULL, NULL);
+ if (ret) {
+ rio_put_tx(rio->dev, tx);
+ return ret;
+ }
+ rio->n_os_credits -= credits;
+ rio->n_peer_credits--;
+
+ wait_event_timeout(rio->waitq, (cqe = rio_poll_cq(rio)),
+ ep->params.connect_timeout);
+ if (rio_req_notify_cq(rio) > 0)
+ pcs_rpc_kick_queue(ep);
+
+ if (!cqe)
+ return -ETIMEDOUT;
+
+ ret = cqe == &tx->cqe && cqe->status == IB_WC_SUCCESS ?
+ 0 : -EFAULT;
+ cqe->done(cqe, true);
+
+ return ret;
+}
+
+static int pcs_rdma_sync_recv(struct pcs_netio *netio, struct pcs_msg **msg)
+{
+ struct pcs_rdmaio *rio = rio_from_netio(netio);
+ struct pcs_rpc *ep = netio->parent;
+ struct rio_rx *rx;
+ struct rio_cqe *cqe;
+ char *buf;
+ int type;
+ char *payload;
+ int payload_size;
+ int credits;
+ struct pcs_remote_buf *rb = NULL;
+ struct pcs_rdma_ack *rack;
+ int ret = 0;
+
+ BUG_ON(!mutex_is_locked(&ep->mutex));
+
+ if (rio->rio_state != RIO_STATE_ESTABLISHED)
+ return -EINVAL;
+
+ wait_event_timeout(rio->waitq, (cqe = rio_poll_cq(rio)),
+ ep->params.connect_timeout);
+ if (rio_req_notify_cq(rio) > 0)
+ pcs_rpc_kick_queue(ep);
+
+ if (!cqe)
+ return -ETIMEDOUT;
+
+ if (cqe->done != rio_rx_done || cqe->status != IB_WC_SUCCESS) {
+ ret = -EFAULT;
+ goto out;
+ }
+
+ rx = container_of(cqe, struct rio_rx, cqe);
+
+ buf = rx2buf(rio, rx);
+ type = rio_parse_hdr(buf, &payload, &payload_size, &credits, &rb, &rack,
+ rio->queue_depth);
+ if (type != RIO_MSG_IMMEDIATE || rb) {
+ ret = -EFAULT;
+ goto out;
+ }
+ rio->n_peer_credits += credits;
+
+ *msg = pcs_rpc_alloc_output_msg(payload_size);
+ if (!*msg) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ memcpy((*msg)->_inline_buffer, payload, payload_size);
+
+out:
+ cqe->done(cqe, true);
+ return ret;
+}
+
+struct pcs_netio_tops pcs_rdma_netio_tops = {
+ .throttle = pcs_rdma_throttle,
+ .unthrottle = pcs_rdma_unthrottle,
+ .send_msg = pcs_rdma_send_msg,
+ .cancel_msg = pcs_rdma_cancel_msg,
+ .abort_io = pcs_rdma_abort_io,
+ .xmit = pcs_rdma_xmit,
+ .flush = pcs_rdma_flush,
+ .next_timeout = pcs_rdma_next_timeout,
+ .sync_send = pcs_rdma_sync_send,
+ .sync_recv = pcs_rdma_sync_recv,
+};
diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.h b/fs/fuse/kio/pcs/pcs_rdma_io.h
new file mode 100644
index 000000000000..912be17e574d
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_io.h
@@ -0,0 +1,114 @@
+#ifndef _PCS_RMDA_IO_H_
+#define _PCS_RMDA_IO_H_ 1
+
+#include <linux/types.h>
+#include <linux/wait.h>
+#include <linux/workqueue.h>
+#include <linux/llist.h>
+
+#include <rdma/rdma_cm.h>
+
+#include "pcs_types.h"
+#include "pcs_rpc.h"
+#include "pcs_sock_io.h"
+#include "pcs_error.h"
+#include "pcs_net.h"
+#include "pcs_rdma_prot.h"
+#include "log.h"
+
+#define RIO_IB_WC_MAX 64
+
+enum {
+ RIO_STATE_CONNECTING, /* needn't rdma_disconnect (yet) */
+ RIO_STATE_ESTABLISHED, /* main "working" state */
+ RIO_STATE_ABORTED, /* rio_abort was called at least once */
+};
+
+extern struct pcs_netio_tops pcs_rdma_netio_tops;
+
+struct pcs_rdma_id
+{
+ int (*event_handler)(struct rdma_cm_id *cmid, struct rdma_cm_event *event);
+};
+
+struct pcs_rdmaio
+{
+ /*
+ * That's not very obvious, we need two poll-able objects: netio.iocomp
+ * and compc. The former handles DISCONNECT event. The latter (compc)
+ * handles WQE completion events. */
+ struct pcs_netio netio;
+ struct pcs_rdma_id id;
+
+ struct llist_node destroy_node;
+
+ wait_queue_head_t waitq;
+ unsigned long io_flags; /* atomic bit ops */
+
+ int rio_state; /* see enum above */
+ int rio_error;
+
+ int hdr_size; /* minimum allowed payload */
+
+ /*
+ * It's easier to have the same queue_depth for both directions.
+ * rdma_connect gets a value from a tunable and sends it via
+ * conn_param; rdma_listen sees it in conn request event and
+ * blindly accepts the value. */
+ int queue_depth;
+ int send_queue_depth;
+ int max_send_wr;
+
+ int send_timeout;
+
+ struct ib_wc wcs[RIO_IB_WC_MAX];
+ int wc_cnt;
+ int wc_idx;
+
+ struct list_head tx_jobs; /* list head of TX jobs */
+
+ struct rio_rx *rx_descs; /* plain array of RX descriptors */
+ char *rx_bufs; /* MR-ed area for payload of RXs */
+ size_t rx_bufs_size;
+ dma_addr_t rx_bufs_dma;
+ struct list_head pended_rxs; /* list head of pended RX frames */
+
+ int n_rx_posted; /* # posted RXs */
+ int n_tx_posted; /* # posted TXs */
+
+ int n_peer_credits; /* what we think about peer's n_rx_posted */
+ int n_reserved_credits; /* limits # RDMA in flight */
+
+ int n_os_credits; /* outstanding credits: # RXs we re-post-ed,
+ * but have not returned to our peer (yet) */
+
+ int n_th_credits; /* threshold: when to return outstanding
+ * credits urgently */
+
+ struct pcs_rdma_device *dev;
+ struct rdma_cm_id *cmid;
+ struct ib_cq *cq;
+
+ struct list_head write_queue;
+ struct list_head reserved_queue; /* out of reserved credits */
+
+ int no_kick; /* do not kick processing write_queue */
+ int throttled; /* pcs_rpc asked us to quiesce */
+
+ struct list_head active_txs; /* list head of active TX frames: tx->msg->done()
+ * is postponed until ACK from our peer */
+
+ u64 xid_generator; /* provides unique (per rio) xids */
+
+ struct pcs_rdmaio_conn_req conn_req;
+};
+
+#define rio_from_netio(nio) container_of(nio, struct pcs_rdmaio, netio)
+#define rio_from_ioconn(conn) container_of(conn, struct pcs_rdmaio, netio.ioconn)
+
+struct pcs_rdmaio* pcs_rdma_create(int hdr_size, struct rdma_cm_id *cmid,
+ int queue_depth, struct pcs_rpc *ep);
+void pcs_rdma_destroy(struct pcs_rdmaio *rio);
+void pcs_rdma_ioconn_destruct(struct pcs_ioconn *ioconn);
+
+#endif /* _PCS_RMDA_IO_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rdma_prot.h b/fs/fuse/kio/pcs/pcs_rdma_prot.h
new file mode 100644
index 000000000000..4a311cdb6f73
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_prot.h
@@ -0,0 +1,119 @@
+#ifndef _PCS_RDMA_PROT_H_
+#define _PCS_RDMA_PROT_H_ 1
+
+/* PCS RDMA network protocol v1 */
+
+#define RIO_MAGIC 0x5078614d
+#define RIO_VERSION 1
+
+#define RIO_MSG_SIZE (2*4096) /* max size of any wire message */
+
+#define RIO_QUEUE_DEPTH 8 /* TODO: make it tunable */
+#define RIO_MAX_QUEUE_DEPTH 128 /* for conn_param sanity checks */
+
+/* negotiated by rdma_connect/rdma_accept */
+struct pcs_rdmaio_conn_req {
+ u32 magic; /* RIO_MAGIC */
+ u32 version; /* RIO_VERSION */
+ u32 queue_depth; /* RIO_QUEUE_DEPTH */
+ u32 msg_size; /* RIO_MSG_SIZE */
+} __attribute__((aligned(8)));
+
+/* negotiated by rdma_connect/rdma_accept */
+struct pcs_rdmaio_rej {
+ struct pcs_rdmaio_conn_req cr;
+ u32 error; /* errno */
+} __attribute__((aligned(8)));
+
+/* "type" field of pcs_rdmaio_msg */
+enum {
+ RIO_MSG_IMMEDIATE = 0,
+ RIO_MSG_NOOP,
+ RIO_MSG_RDMA_READ_REQ,
+ RIO_MSG_RDMA_READ_ACK,
+ RIO_MAX_TYPE_VALUE
+};
+
+/* sent/recieved by rdma_post_send/rdma_post_recv */
+struct pcs_rdmaio_hdr {
+ u32 magic; /* RIO_MAGIC */
+ u16 version; /* RIO_VERSION */
+ u16 type; /* RIO_MSG_IMMEDIATE/... */
+ u32 size; /* total size of wire message */
+ u32 credits; /* # credits to return */
+} __attribute__((aligned(8)));
+
+struct pcs_remote_buf {
+ u64 xid;
+ u64 rbuf;
+ u32 rkey;
+ u32 rlen;
+} __attribute__((aligned(8)));
+
+struct pcs_rdma_ack {
+ u64 xid;
+ u32 status;
+} __attribute__((aligned(8)));
+
+static inline int rio_parse_hdr(char *buf, char **payload, int *payload_size,
+ int *credits, struct pcs_remote_buf **rb,
+ struct pcs_rdma_ack **rack,
+ int queue_depth)
+{
+ struct pcs_rdmaio_hdr *hdr = (struct pcs_rdmaio_hdr *)buf;
+
+ if (hdr->magic != RIO_MAGIC) {
+ TRACE("wrong rio msg magic: 0x%x\n", hdr->magic);
+ return -1;
+ }
+
+ if (hdr->version != RIO_VERSION) {
+ TRACE("wrong rio msg version: 0x%x\n", hdr->version);
+ return -1;
+ }
+
+ if (hdr->type >= RIO_MAX_TYPE_VALUE) {
+ TRACE("wrong rio msg type: 0x%x\n", hdr->type);
+ return -1;
+ }
+
+ if (hdr->size > RIO_MSG_SIZE) {
+ TRACE("wrong rio msg size: 0x%x\n", hdr->size);
+ return -1;
+ }
+
+ if (hdr->credits > queue_depth) {
+ TRACE("wrong rio msg credits: 0x%x\n", hdr->credits);
+ return -1;
+ }
+
+ if (hdr->type == RIO_MSG_RDMA_READ_REQ &&
+ hdr->size - sizeof(*hdr) < sizeof(struct pcs_remote_buf)) {
+ TRACE("short rdma read req: 0x%x\n", hdr->size);
+ return -1;
+ }
+
+ if (hdr->type == RIO_MSG_RDMA_READ_ACK &&
+ hdr->size != sizeof(*hdr) + sizeof(struct pcs_rdma_ack)) {
+ TRACE("wrong size rdma read ack: 0x%x\n", hdr->size);
+ return -1;
+ }
+
+ *payload = buf + sizeof(*hdr);
+ *payload_size = hdr->size - sizeof(*hdr);
+ *credits = hdr->credits;
+
+ if (hdr->type == RIO_MSG_RDMA_READ_REQ) {
+ *rb = (struct pcs_remote_buf *)*payload;
+ *payload += sizeof(struct pcs_remote_buf);
+ *payload_size -= sizeof(struct pcs_remote_buf);
+ } else if (hdr->type == RIO_MSG_RDMA_READ_ACK) {
+ *rack = (struct pcs_rdma_ack *)*payload;
+ *payload += sizeof(struct pcs_rdma_ack);
+ *payload_size -= sizeof(struct pcs_rdma_ack);
+ }
+
+ return hdr->type;
+}
+
+#endif /* _PCS_RDMA_PROT_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_rdma_rw.c b/fs/fuse/kio/pcs/pcs_rdma_rw.c
new file mode 100644
index 000000000000..15659083b889
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_rw.c
@@ -0,0 +1,815 @@
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+
+#include "pcs_rdma_rw.h"
+
+static int dma_dir_to_ib_reg_access(enum dma_data_direction dir)
+{
+ switch (dir) {
+ case DMA_BIDIRECTIONAL:
+ return IB_ACCESS_REMOTE_READ |
+ IB_ACCESS_REMOTE_WRITE |
+ IB_ACCESS_LOCAL_WRITE;
+ case DMA_TO_DEVICE:
+ return IB_ACCESS_REMOTE_READ;
+ case DMA_FROM_DEVICE:
+ return IB_ACCESS_REMOTE_WRITE |
+ IB_ACCESS_LOCAL_WRITE;
+ default:
+ return 0;
+ };
+}
+
+static unsigned int dma_dir_to_it_dir(enum dma_data_direction dir)
+{
+ return DMA_TO_DEVICE ? WRITE : READ;
+}
+
+int pcs_sgl_buf_init_from_msg(struct pcs_sgl_buf *sbuf, unsigned int dir, struct pcs_msg *msg,
+ size_t offset, size_t end_offset, gfp_t gfp, bool allow_gaps)
+{
+ struct iov_iter it;
+ size_t msg_offset;
+ struct scatterlist *sg;
+ int ret, i;
+
+ if (offset >= end_offset || end_offset > msg->size)
+ return -EINVAL;
+
+ sbuf->pg_cnt = 0;
+ sbuf->sg_cnt = 0;
+ sbuf->page_clean = put_page;
+
+ iov_iter_kvec(&it, dir, NULL, 0, 0);
+ msg_offset = offset;
+ while (msg_offset < end_offset) {
+ size_t len;
+
+ if (!iov_iter_count(&it))
+ msg->get_iter(msg, msg_offset, &it, dir);
+
+ len = iov_iter_single_seg_count(&it);
+ if (len > end_offset - msg_offset)
+ len = end_offset - msg_offset;
+
+ iov_iter_advance(&it, len);
+ msg_offset += len;
+ sbuf->sg_cnt++;
+ }
+
+ sbuf->pages = RE_NULL(kzalloc(sizeof(struct page*) * sbuf->sg_cnt,
+ gfp));
+ if (!sbuf->pages)
+ return -ENOMEM;
+
+ sbuf->sgl = RE_NULL(kzalloc(sizeof(struct scatterlist) * sbuf->sg_cnt,
+ gfp));
+ if (!sbuf->sgl) {
+ ret = -ENOMEM;
+ goto fail_pg;
+ }
+ sg_init_table(sbuf->sgl, sbuf->sg_cnt);
+
+ iov_iter_kvec(&it, dir, NULL, 0, 0);
+ msg_offset = offset;
+ sg = sbuf->sgl;
+ while (msg_offset < end_offset) {
+ size_t off, len;
+
+ if (!iov_iter_count(&it))
+ msg->get_iter(msg, msg_offset, &it, dir);
+
+ len = iov_iter_get_pages(&it, sbuf->pages + sbuf->pg_cnt,
+ PAGE_SIZE, 1, &off);
+ if (len <= 0) {
+ ret = -EINVAL;
+ goto fail_sgl;
+ }
+
+ if (len > end_offset - msg_offset)
+ len = end_offset - msg_offset;
+
+ sg_set_page(sg, sbuf->pages[sbuf->pg_cnt++], len, off);
+
+ if (!allow_gaps &&
+ ((msg_offset != offset && !IS_ALIGNED(off, PAGE_SIZE)) ||
+ ((msg_offset + len != end_offset) && !IS_ALIGNED(off + len, PAGE_SIZE)))) {
+ ret = -EINVAL;
+ goto fail_sgl;
+ }
+
+ sg = sg_next(sg);
+ iov_iter_advance(&it, len);
+ msg_offset += len;
+ }
+
+ return 0;
+
+fail_sgl:
+ kfree(sbuf->sgl);
+fail_pg:
+ for (i = 0; i < sbuf->pg_cnt; i++)
+ sbuf->page_clean(sbuf->pages[i]);
+ kfree(sbuf->pages);
+
+ return ret;
+}
+
+static void pcs_sgl_buf_free_page(struct page *page)
+{
+ __free_page(page);
+}
+
+int pcs_sgl_buf_init(struct pcs_sgl_buf *sbuf, size_t size, gfp_t gfp)
+{
+ struct scatterlist *sg;
+ int ret, i;
+
+ if (size == 0)
+ return -EINVAL;
+
+ sbuf->pg_cnt = 0;
+ sbuf->sg_cnt = PAGE_ALIGN(size) >> PAGE_SHIFT;
+ sbuf->page_clean = pcs_sgl_buf_free_page;
+
+ sbuf->pages = RE_NULL(kzalloc(sizeof(struct page*) * sbuf->sg_cnt,
+ gfp));
+ if (!sbuf->pages)
+ return -ENOMEM;
+
+ sbuf->sgl = RE_NULL(kzalloc(sizeof(struct scatterlist) * sbuf->sg_cnt,
+ gfp));
+ if (!sbuf->sgl) {
+ ret = -ENOMEM;
+ goto fail_pg;
+ }
+ sg_init_table(sbuf->sgl, sbuf->sg_cnt);
+
+ for_each_sg(sbuf->sgl, sg, sbuf->sg_cnt, i) {
+ size_t sg_len = min_t(size_t, size, PAGE_SIZE);
+ sbuf->pages[sbuf->pg_cnt] = RE_NULL(alloc_page(gfp));
+ if (!sbuf->pages[sbuf->pg_cnt]) {
+ ret = -ENOMEM;
+ goto fail_sgl;
+ }
+ BUG_ON(!sg_len);
+ sg_set_page(sg, sbuf->pages[sbuf->pg_cnt], sg_len, 0);
+ size -= sg_len;
+ sbuf->pg_cnt++;
+ }
+
+ return 0;
+
+fail_sgl:
+ kfree(sbuf->sgl);
+fail_pg:
+ for (i = 0; i < sbuf->pg_cnt; i++)
+ sbuf->page_clean(sbuf->pages[i]);
+ kfree(sbuf->pages);
+
+ return ret;
+}
+
+void pcs_sgl_buf_destroy(struct pcs_sgl_buf *sbuf)
+{
+ int i;
+
+ kfree(sbuf->sgl);
+
+ for (i = 0; i < sbuf->pg_cnt; i++)
+ sbuf->page_clean(sbuf->pages[i]);
+ kfree(sbuf->pages);
+}
+
+int pcs_ib_mr_pool_init(struct pcs_ib_mr_pool *pool, struct ib_pd *pd,
+ enum ib_mr_type mr_type, u32 max_num_sg, size_t mr_cnt)
+{
+ struct ib_mr *mr;
+ int ret, i;
+
+ pool->pd = pd;
+ pool->mr_type = mr_type;
+ pool->max_num_sg = max_num_sg;
+ pool->mr_cnt = mr_cnt;
+
+ spin_lock_init(&pool->lock);
+
+ INIT_LIST_HEAD(&pool->mr_list);
+ pool->used_mrs = 0;
+
+ for (i = 0; i < mr_cnt; i++) {
+ mr = RE_PTR_INV(ib_alloc_mr(pd, mr_type, max_num_sg));
+ if (IS_ERR(mr)) {
+ ret = PTR_ERR(mr);
+ goto fail;
+ }
+ list_add_tail(&mr->qp_entry, &pool->mr_list);
+ }
+
+ return 0;
+
+fail:
+ pcs_ib_mr_pool_destroy(pool);
+ return ret;
+}
+
+void pcs_ib_mr_pool_destroy(struct pcs_ib_mr_pool *pool)
+{
+ struct ib_mr *mr, *tmp;
+
+ spin_lock_irq(&pool->lock);
+ BUG_ON(pool->used_mrs);
+ spin_unlock_irq(&pool->lock);
+
+ list_for_each_entry_safe(mr, tmp, &pool->mr_list, qp_entry)
+ ib_dereg_mr(mr);
+}
+
+struct ib_mr* pcs_ib_mr_pool_get(struct pcs_ib_mr_pool *pool)
+{
+ struct ib_mr *mr;
+ unsigned long flags;
+
+ spin_lock_irqsave(&pool->lock, flags);
+ mr = list_first_entry_or_null(&pool->mr_list, struct ib_mr, qp_entry);
+ if (mr) {
+ list_del(&mr->qp_entry);
+ BUG_ON(pool->used_mrs >= pool->mr_cnt);
+ pool->used_mrs++;
+ }
+ spin_unlock_irqrestore(&pool->lock, flags);
+
+ if (!mr && !in_interrupt()) {
+ mr = RE_PTR_INV(ib_alloc_mr(pool->pd, pool->mr_type,
+ pool->max_num_sg));
+ if (IS_ERR(mr))
+ return NULL;
+
+ spin_lock_irqsave(&pool->lock, flags);
+ pool->mr_cnt++;
+ BUG_ON(pool->used_mrs >= pool->mr_cnt);
+ pool->used_mrs++;
+ spin_unlock_irqrestore(&pool->lock, flags);
+ }
+
+ return mr;
+}
+
+void pcs_ib_mr_pool_put(struct pcs_ib_mr_pool *pool, struct ib_mr *mr)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&pool->lock, flags);
+ list_add(&mr->qp_entry, &pool->mr_list);
+ BUG_ON(!pool->used_mrs);
+ pool->used_mrs--;
+ spin_unlock_irqrestore(&pool->lock, flags);
+}
+
+int pcs_rdma_rw_init_from_msg(struct pcs_rdma_rw *rw, struct ib_device *dev,
+ enum dma_data_direction dir, u64 remote_addr, u32 rkey,
+ u32 local_dma_lkey, struct pcs_msg *msg, size_t offset,
+ size_t end_offset, gfp_t gfp, size_t sge_per_wr)
+{
+ struct scatterlist *sg;
+ struct ib_sge *sge;
+ size_t sge_cnt;
+ int ret, i, k;
+
+ if (dir != DMA_TO_DEVICE && dir != DMA_FROM_DEVICE)
+ return -EINVAL;
+
+ rw->dev = dev;
+ rw->dir = dir;
+
+ ret = pcs_sgl_buf_init_from_msg(&rw->sbuf, dma_dir_to_it_dir(dir), msg, offset, end_offset,
+ gfp, true);
+ if (ret)
+ return ret;
+
+ ret = RE_INV(ib_dma_map_sg(dev, rw->sbuf.sgl, rw->sbuf.sg_cnt,
+ dir));
+ if (ret <= 0) {
+ ret = ret < 0 ? ret : -EIO;
+ goto fail_sgl;
+ }
+ rw->sbuf.sg_cnt = ret;
+
+ rw->nr_wrs = DIV_ROUND_UP(rw->sbuf.sg_cnt, sge_per_wr);
+
+ rw->sges = RE_NULL(kzalloc(sizeof(struct ib_sge) * rw->sbuf.sg_cnt, gfp));
+ if (!rw->sges) {
+ ret = -ENOMEM;
+ goto fail_dma;
+ }
+
+ rw->wrs = RE_NULL(kzalloc(sizeof(struct ib_rdma_wr) * rw->nr_wrs, gfp));
+ if (!rw->wrs) {
+ ret = -ENOMEM;
+ goto fail_sges;
+ }
+
+ sg = rw->sbuf.sgl;
+ sge = rw->sges;
+ sge_cnt = rw->sbuf.sg_cnt;
+ for (i = 0; i < rw->nr_wrs; i++) {
+ struct ib_rdma_wr *wr = rw->wrs + i;
+ int num_sge = min_t(size_t, sge_cnt, sge_per_wr);
+
+ wr->wr.opcode = dir == DMA_FROM_DEVICE ? IB_WR_RDMA_READ :
+ IB_WR_RDMA_WRITE;
+ wr->wr.sg_list = sge;
+ wr->wr.num_sge = num_sge;
+ wr->remote_addr = remote_addr;
+ wr->rkey = rkey;
+
+ for (k = 0; k < num_sge; k++, sg = sg_next(sg)) {
+ sge->addr = sg_dma_address(sg);
+ sge->length = sg_dma_len(sg);
+ sge->lkey = local_dma_lkey;
+
+ remote_addr += sge->length;
+ sge++;
+ sge_cnt--;
+ }
+
+ if (i > 0)
+ rw->wrs[i - 1].wr.next = &wr->wr;
+ }
+
+ return 0;
+
+fail_sges:
+ kfree(rw->sges);
+fail_dma:
+ ib_dma_unmap_sg(dev, rw->sbuf.sgl, rw->sbuf.sg_cnt, dir);
+fail_sgl:
+ pcs_sgl_buf_destroy(&rw->sbuf);
+
+ return ret;
+}
+
+void pcs_rdma_rw_destroy(struct pcs_rdma_rw *rw)
+{
+ kfree(rw->wrs);
+ kfree(rw->sges);
+ ib_dma_unmap_sg(rw->dev, rw->sbuf.sgl, rw->sbuf.sg_cnt, rw->dir);
+ pcs_sgl_buf_destroy(&rw->sbuf);
+}
+
+static int pcs_rdma_mr_init_common(struct pcs_rdma_mr *mr, struct ib_device *dev,
+ struct ib_pd *pd, enum dma_data_direction dir, size_t size,
+ struct pcs_ib_mr_pool *ib_mr_pool)
+{
+ unsigned long dma_align = dma_get_cache_alignment();
+ struct scatterlist *sg;
+ int ret, i;
+
+ /* For testing fallback */
+ RE_SET(ib_mr_pool, NULL);
+
+ /* Only cache aligned DMA transfers are reliable */
+ for_each_sg(mr->sbuf.sgl, sg, mr->sbuf.sg_cnt, i)
+ if (!IS_ALIGNED((uintptr_t)sg_virt(sg), dma_align) ||
+ !IS_ALIGNED((uintptr_t)(sg_virt(sg) + sg_dma_len(sg)),
+ dma_align))
+ return -EINVAL;
+
+ INIT_LIST_HEAD(&mr->entry);
+ mr->dev = dev;
+ mr->pd = pd;
+ mr->dir = dir;
+ mr->size = size;
+ mr->ib_mr_pool = ib_mr_pool && ib_mr_pool->mr_type == IB_MR_TYPE_MEM_REG &&
+ ib_mr_pool->max_num_sg >= mr->sbuf.sg_cnt ? ib_mr_pool : NULL;
+
+ ret = RE_INV(ib_dma_map_sg(dev, mr->sbuf.sgl, mr->sbuf.sg_cnt,
+ dir));
+ if (ret <= 0)
+ return ret < 0 ? ret : -EIO;
+ mr->sbuf.sg_cnt = ret;
+
+ if (mr->ib_mr_pool) {
+ mr->mr = RE_NULL(pcs_ib_mr_pool_get(mr->ib_mr_pool));
+ if (!mr->mr) {
+ ret = -ENOMEM;
+ goto fail_dma;
+ }
+ } else { /* fallback */
+ mr->mr = RE_PTR_INV(ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG,
+ mr->sbuf.sg_cnt));
+ if (IS_ERR(mr->mr)) {
+ ret = PTR_ERR(mr->mr);
+ goto fail_dma;
+ }
+ }
+
+ ret = RE_INV(ib_map_mr_sg(mr->mr, mr->sbuf.sgl, mr->sbuf.sg_cnt,
+ NULL, PAGE_SIZE));
+ if (ret != mr->sbuf.sg_cnt) {
+ ret = ret < 0 ? ret : -EIO;
+ goto fail_mr;
+ }
+
+ memset(&mr->inv_wr, 0, sizeof(mr->inv_wr));
+ mr->inv_wr.next = &mr->reg_wr.wr;
+ mr->inv_wr.opcode = IB_WR_LOCAL_INV;
+ mr->inv_wr.ex.invalidate_rkey = mr->mr->lkey;
+
+ ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->lkey));
+
+ memset(&mr->reg_wr, 0, sizeof(mr->reg_wr));
+ mr->reg_wr.wr.opcode = IB_WR_REG_MR;
+ mr->reg_wr.mr = mr->mr;
+ mr->reg_wr.key = mr->mr->lkey;
+ mr->reg_wr.access = dma_dir_to_ib_reg_access(dir);
+
+ mr->first_wr = mr->mr->need_inval ? &mr->inv_wr: &mr->reg_wr.wr;
+ mr->last_wr = &mr->reg_wr.wr;
+ mr->mr->need_inval = true;
+
+ return 0;
+
+fail_mr:
+ if (mr->ib_mr_pool)
+ pcs_ib_mr_pool_put(mr->ib_mr_pool, mr->mr);
+ else /* fallback */
+ ib_dereg_mr(mr->mr);
+fail_dma:
+ ib_dma_unmap_sg(dev, mr->sbuf.sgl, mr->sbuf.sg_cnt, dir);
+
+ return ret;
+}
+
+int pcs_rdma_mr_init_from_msg(struct pcs_rdma_mr *mr, struct ib_device *dev,
+ struct ib_pd *pd, enum dma_data_direction dir, struct pcs_msg *msg,
+ size_t offset, size_t end_offset, gfp_t gfp,
+ struct pcs_ib_mr_pool *ib_mr_pool)
+{
+ int ret;
+
+ ret = pcs_sgl_buf_init_from_msg(&mr->sbuf, dma_dir_to_it_dir(dir), msg, offset, end_offset,
+ gfp, false);
+ if (ret)
+ return ret;
+
+ ret = pcs_rdma_mr_init_common(mr, dev, pd, dir, end_offset - offset,
+ ib_mr_pool);
+ if (ret)
+ pcs_sgl_buf_destroy(&mr->sbuf);
+
+ return ret;
+}
+
+int pcs_rdma_mr_init(struct pcs_rdma_mr *mr, struct ib_device *dev, struct ib_pd *pd,
+ enum dma_data_direction dir, size_t size, gfp_t gfp,
+ struct pcs_ib_mr_pool *ib_mr_pool)
+{
+ int ret;
+
+ ret = pcs_sgl_buf_init(&mr->sbuf, size, gfp);
+ if (ret)
+ return ret;
+
+ ret = pcs_rdma_mr_init_common(mr, dev, pd, dir, size, ib_mr_pool);
+ if (ret)
+ pcs_sgl_buf_destroy(&mr->sbuf);
+
+ return ret;
+}
+
+void pcs_rdma_mr_destroy(struct pcs_rdma_mr *mr)
+{
+ if (mr->ib_mr_pool)
+ pcs_ib_mr_pool_put(mr->ib_mr_pool, mr->mr);
+ else /* fallback */
+ ib_dereg_mr(mr->mr);
+ ib_dma_unmap_sg(mr->dev, mr->sbuf.sgl, mr->sbuf.sg_cnt, mr->dir);
+ pcs_sgl_buf_destroy(&mr->sbuf);
+}
+
+struct pcs_rdma_mr* pcs_rdma_mr_alloc(struct ib_device *dev, struct ib_pd *pd,
+ enum dma_data_direction dir, size_t size, gfp_t gfp,
+ struct pcs_ib_mr_pool *ib_mr_pool)
+{
+ struct pcs_rdma_mr *mr;
+ int ret;
+
+ mr = RE_NULL(kzalloc(sizeof(*mr), gfp));
+ if (!mr)
+ return ERR_PTR(-ENOMEM);
+
+ ret = pcs_rdma_mr_init(mr, dev, pd, dir, size, gfp, ib_mr_pool);
+ if (ret) {
+ kfree(mr);
+ return ERR_PTR(ret);
+ }
+
+ return mr;
+}
+
+void pcs_rdma_mr_free(struct pcs_rdma_mr *mr)
+{
+ pcs_rdma_mr_destroy(mr);
+ kfree(mr);
+}
+
+void pcs_rdma_mr_sync_for_cpu(struct pcs_rdma_mr *mr, size_t size)
+{
+ struct scatterlist *sg;
+ unsigned int i;
+
+ for_each_sg(mr->sbuf.sgl, sg, mr->sbuf.sg_cnt, i) {
+ size_t sg_len = min_t(size_t, size, sg_dma_len(sg));
+ ib_dma_sync_single_for_cpu(mr->dev,
+ sg_dma_address(sg),
+ sg_len,
+ mr->dir);
+ size -= sg_len;
+ if (!size)
+ break;
+ }
+}
+
+void pcs_rdma_mr_sync_for_device(struct pcs_rdma_mr *mr, size_t size)
+{
+ struct scatterlist *sg;
+ unsigned int i;
+
+ for_each_sg(mr->sbuf.sgl, sg, mr->sbuf.sg_cnt, i) {
+ size_t sg_len = min_t(size_t, size, sg_dma_len(sg));
+ ib_dma_sync_single_for_device(mr->dev,
+ sg_dma_address(sg),
+ sg_len,
+ mr->dir);
+ size -= sg_len;
+ if (!size)
+ break;
+ }
+}
+
+int pcs_rdma_mr_sync_for_msg(struct pcs_rdma_mr *mr, struct pcs_msg *msg,
+ size_t offset, size_t end_offset, bool to_msg)
+{
+ struct iov_iter it;
+ struct scatterlist *mr_sg;
+ size_t mr_off;
+ unsigned int dir = to_msg ? READ : WRITE;
+
+ if (offset >= end_offset || end_offset > msg->size ||
+ end_offset - offset > mr->size)
+ return -EINVAL;
+
+ iov_iter_kvec(&it, dir, NULL, 0, 0);
+
+ mr_sg = mr->sbuf.sgl;
+ mr_off = 0;
+
+ while (offset < end_offset) {
+ size_t msg_off, msg_len, res;
+
+ if (!iov_iter_count(&it))
+ msg->get_iter(msg, offset, &it, dir);
+
+ msg_len = iov_iter_count(&it);
+ if (msg_len > end_offset - offset)
+ msg_len = end_offset - offset;
+
+ msg_off = 0;
+ while (msg_off < msg_len) {
+ void *mr_data = sg_virt(mr_sg);
+ size_t chunk_size = min_t(size_t, msg_len - msg_off, PAGE_SIZE - mr_off);
+
+ if (to_msg)
+ res = copy_to_iter(mr_data + mr_off, chunk_size, &it);
+ else
+ res = copy_from_iter(mr_data + mr_off, chunk_size, &it);
+ BUG_ON(res != chunk_size);
+
+ mr_off += chunk_size;
+ msg_off += chunk_size;
+ if (mr_off == PAGE_SIZE) {
+ mr_sg = sg_next(mr_sg);
+ mr_off = 0;
+ }
+ }
+
+ offset += msg_len;
+ }
+
+ return 0;
+}
+
+int pcs_rdma_mr_pool_init(struct pcs_rdma_mr_pool *pool, size_t mr_size,
+ size_t mr_cnt, struct ib_device *dev, struct ib_pd *pd,
+ enum dma_data_direction dir, gfp_t gfp, struct pcs_ib_mr_pool *ib_mr_pool)
+{
+ struct pcs_rdma_mr *mr;
+ int ret, i;
+
+ pool->mr_size = mr_size;
+ pool->mr_cnt = mr_cnt;
+ pool->dev = dev;
+ pool->pd = pd;
+ pool->dir = dir;
+ pool->gfp = gfp;
+ pool->ib_mr_pool = ib_mr_pool;
+
+ spin_lock_init(&pool->lock);
+
+ INIT_LIST_HEAD(&pool->mr_list);
+ pool->used_mrs = 0;
+
+ for (i = 0; i < mr_cnt; i++) {
+ mr = pcs_rdma_mr_alloc(dev, pd, dir, mr_size, gfp, ib_mr_pool);
+ if (IS_ERR(mr)) {
+ ret = PTR_ERR(mr);
+ goto fail;
+ }
+ list_add_tail(&mr->entry, &pool->mr_list);
+ }
+
+ return 0;
+
+fail:
+ pcs_rdma_mr_pool_destroy(pool);
+ return ret;
+}
+
+void pcs_rdma_mr_pool_destroy(struct pcs_rdma_mr_pool *pool)
+{
+ struct pcs_rdma_mr *mr, *tmp;
+
+ spin_lock_irq(&pool->lock);
+ BUG_ON(pool->used_mrs);
+ spin_unlock_irq(&pool->lock);
+
+ list_for_each_entry_safe(mr, tmp, &pool->mr_list, entry)
+ pcs_rdma_mr_free(mr);
+}
+
+struct pcs_rdma_mr* pcs_rdma_mr_pool_get(struct pcs_rdma_mr_pool *pool)
+{
+ struct pcs_rdma_mr *mr;
+ unsigned long flags;
+
+ spin_lock_irqsave(&pool->lock, flags);
+ mr = list_first_entry_or_null(&pool->mr_list, struct pcs_rdma_mr, entry);
+ if (mr) {
+ list_del(&mr->entry);
+ BUG_ON(pool->used_mrs >= pool->mr_cnt);
+ pool->used_mrs++;
+ }
+ spin_unlock_irqrestore(&pool->lock, flags);
+
+ if (!mr && !in_interrupt()) {
+ mr = pcs_rdma_mr_alloc(pool->dev, pool->pd, pool->dir,
+ pool->mr_size, pool->gfp,
+ pool->ib_mr_pool);
+ if (IS_ERR(mr))
+ return NULL;
+
+ spin_lock_irqsave(&pool->lock, flags);
+ pool->mr_cnt++;
+ BUG_ON(pool->used_mrs >= pool->mr_cnt);
+ pool->used_mrs++;
+ spin_unlock_irqrestore(&pool->lock, flags);
+ }
+
+ return mr;
+}
+
+void pcs_rdma_mr_pool_put(struct pcs_rdma_mr_pool *pool, struct pcs_rdma_mr *mr)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&pool->lock, flags);
+ list_add(&mr->entry, &pool->mr_list);
+ BUG_ON(!pool->used_mrs);
+ pool->used_mrs--;
+ spin_unlock_irqrestore(&pool->lock, flags);
+}
+
+static void pcs_rdma_msg_cleanup_map(struct pcs_rdma_msg *rdma_msg)
+{
+ pcs_rdma_mr_destroy(rdma_msg->mr);
+}
+
+static int pcs_rdma_msg_init_map(struct pcs_rdma_msg *rdma_msg, struct pcs_msg *msg,
+ size_t offset, size_t end_offset,
+ struct pcs_rdma_mr_pool *pool)
+{
+ int ret;
+
+ if (offset >= end_offset || end_offset > msg->size)
+ return -EINVAL;
+
+ rdma_msg->msg = msg;
+ rdma_msg->offset = offset;
+ rdma_msg->end_offset = end_offset;
+
+ rdma_msg->pool = NULL;
+ rdma_msg->cleanup = pcs_rdma_msg_cleanup_map;
+
+ rdma_msg->first_wr = NULL;
+ rdma_msg->last_wr = NULL;
+
+ rdma_msg->mr = &rdma_msg->_inline_mr;
+ ret = pcs_rdma_mr_init_from_msg(rdma_msg->mr, pool->dev, pool->pd, pool->dir,
+ msg, offset, end_offset, pool->gfp, pool->ib_mr_pool);
+ if (ret)
+ return ret;
+
+ rdma_msg->lkey = rdma_msg->mr->mr->lkey;
+ rdma_msg->rkey = rdma_msg->mr->mr->rkey;
+ rdma_msg->iova = rdma_msg->mr->mr->iova;
+
+ if (rdma_msg->mr->first_wr && rdma_msg->mr->last_wr) {
+ rdma_msg->first_wr = rdma_msg->mr->first_wr;
+ rdma_msg->last_wr = rdma_msg->mr->last_wr;
+ rdma_msg->mr->first_wr = NULL;
+ rdma_msg->mr->last_wr = NULL;
+ }
+
+ return 0;
+}
+
+static void pcs_rdma_msg_cleanup(struct pcs_rdma_msg *rdma_msg)
+{
+ pcs_rdma_mr_sync_for_cpu(rdma_msg->mr,
+ PAGE_ALIGN(rdma_msg->end_offset - rdma_msg->offset));
+ if (rdma_msg->mr->dir == DMA_BIDIRECTIONAL ||
+ rdma_msg->mr->dir == DMA_FROM_DEVICE)
+ BUG_ON(pcs_rdma_mr_sync_for_msg(rdma_msg->mr, rdma_msg->msg,
+ rdma_msg->offset,
+ rdma_msg->end_offset, true));
+
+ if (rdma_msg->pool)
+ pcs_rdma_mr_pool_put(rdma_msg->pool, rdma_msg->mr);
+ else /* fallback */
+ pcs_rdma_mr_destroy(rdma_msg->mr);
+}
+
+int pcs_rdma_msg_init(struct pcs_rdma_msg *rdma_msg, struct pcs_msg *msg,
+ size_t offset, size_t end_offset, struct pcs_rdma_mr_pool *pool,
+ bool try_to_map)
+{
+ size_t mr_size = PAGE_ALIGN(end_offset - offset);
+ int ret;
+
+ if (offset >= end_offset || end_offset > msg->size)
+ return -EINVAL;
+
+ if (try_to_map && !pcs_rdma_msg_init_map(rdma_msg, msg, offset, end_offset, pool))
+ return 0;
+
+ rdma_msg->msg = msg;
+ rdma_msg->offset = offset;
+ rdma_msg->end_offset = end_offset;
+
+ rdma_msg->pool = mr_size > pool->mr_size ? NULL : pool;
+ rdma_msg->cleanup = pcs_rdma_msg_cleanup;
+
+ /* For testing fallback */
+ RE_SET(rdma_msg->pool, NULL);
+
+ rdma_msg->first_wr = NULL;
+ rdma_msg->last_wr = NULL;
+
+ if (rdma_msg->pool) {
+ rdma_msg->mr = RE_NULL(pcs_rdma_mr_pool_get(rdma_msg->pool));
+ if (!rdma_msg->mr)
+ return -ENOMEM;
+ } else { /* fallback */
+ rdma_msg->mr = &rdma_msg->_inline_mr;
+ ret = pcs_rdma_mr_init(rdma_msg->mr, pool->dev, pool->pd, pool->dir,
+ mr_size, pool->gfp, pool->ib_mr_pool);
+ if (ret)
+ return ret;
+ }
+
+ if (rdma_msg->mr->dir == DMA_BIDIRECTIONAL ||
+ rdma_msg->mr->dir == DMA_TO_DEVICE)
+ BUG_ON(pcs_rdma_mr_sync_for_msg(rdma_msg->mr, msg, offset,
+ end_offset, false));
+ pcs_rdma_mr_sync_for_device(rdma_msg->mr, mr_size);
+
+ rdma_msg->lkey = rdma_msg->mr->mr->lkey;
+ rdma_msg->rkey = rdma_msg->mr->mr->rkey;
+ rdma_msg->iova = rdma_msg->mr->mr->iova;
+
+ if (rdma_msg->mr->first_wr && rdma_msg->mr->last_wr) {
+ rdma_msg->first_wr = rdma_msg->mr->first_wr;
+ rdma_msg->last_wr = rdma_msg->mr->last_wr;
+ rdma_msg->mr->first_wr = NULL;
+ rdma_msg->mr->last_wr = NULL;
+ }
+
+ return 0;
+}
+
+void pcs_rdma_msg_destroy(struct pcs_rdma_msg *rdma_msg)
+{
+ rdma_msg->cleanup(rdma_msg);
+}
diff --git a/fs/fuse/kio/pcs/pcs_rdma_rw.h b/fs/fuse/kio/pcs/pcs_rdma_rw.h
new file mode 100644
index 000000000000..22ea59d02baf
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_rdma_rw.h
@@ -0,0 +1,184 @@
+#ifndef _PCS_RMDA_RW_H_
+#define _PCS_RMDA_RW_H_ 1
+
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/scatterlist.h>
+#include <linux/dma-direction.h>
+
+#include <rdma/ib_verbs.h>
+
+#include "pcs_sock_io.h"
+
+#if defined(CONFIG_DEBUG_KERNEL) && defined(CONFIG_FUSE_KIO_DEBUG)
+extern u32 rdmaio_io_failing;
+#define RE_FAIL() \
+({ \
+ static atomic_t __fail_cnt = ATOMIC_INIT(1); \
+ static atomic_t __fail_hop = ATOMIC_INIT(1); \
+ bool __ret = false; \
+ if (rdmaio_io_failing && atomic_dec_return(&__fail_cnt) <= 0) { \
+ atomic_add(atomic_inc_return(&__fail_hop), &__fail_cnt); \
+ TRACE("RE: fail!!!\n"); \
+ __ret = true; \
+ } \
+ __ret; \
+})
+#define RE(func, err) (RE_FAIL() ? err : (func))
+#define RE_PTR(func, err) (RE_FAIL() ? ERR_PTR(err) : (func))
+#define RE_NULL(func) (RE_FAIL() ? NULL : (func))
+#define RE_SET(var, val) { if (RE_FAIL()) var = val; }
+#else
+#define RE(func, err) func
+#define RE_PTR(func, err) func
+#define RE_NULL(func) func
+#define RE_SET(var, val)
+#endif
+
+#define RE_INV(func) RE(func, -EINVAL)
+#define RE_PTR_INV(func) RE_PTR(func, -EINVAL)
+
+struct pcs_sgl_buf
+{
+ struct page **pages;
+ size_t pg_cnt;
+ void (*page_clean)(struct page *page);
+
+ struct scatterlist *sgl;
+ size_t sg_cnt;
+};
+
+int pcs_sgl_buf_init_from_msg(struct pcs_sgl_buf *sbuf, unsigned int dir, struct pcs_msg *msg,
+ size_t offset, size_t end_offset, gfp_t gfp, bool allow_gaps);
+int pcs_sgl_buf_init(struct pcs_sgl_buf *sbuf, size_t size, gfp_t gfp);
+void pcs_sgl_buf_destroy(struct pcs_sgl_buf *sbuf);
+
+struct pcs_ib_mr_pool
+{
+ struct ib_pd *pd;
+ enum ib_mr_type mr_type;
+ u32 max_num_sg;
+ size_t mr_cnt;
+
+ spinlock_t lock;
+
+ struct list_head mr_list;
+ size_t used_mrs;
+};
+
+int pcs_ib_mr_pool_init(struct pcs_ib_mr_pool *pool, struct ib_pd *pd,
+ enum ib_mr_type mr_type, u32 max_num_sg, size_t mr_cnt);
+void pcs_ib_mr_pool_destroy(struct pcs_ib_mr_pool *pool);
+
+struct ib_mr* pcs_ib_mr_pool_get(struct pcs_ib_mr_pool *pool);
+void pcs_ib_mr_pool_put(struct pcs_ib_mr_pool *pool, struct ib_mr *mr);
+
+struct pcs_rdma_rw
+{
+ struct ib_device *dev;
+ enum dma_data_direction dir;
+
+ struct pcs_sgl_buf sbuf;
+
+ struct ib_sge *sges;
+ struct ib_rdma_wr *wrs;
+ size_t nr_wrs;
+};
+
+int pcs_rdma_rw_init_from_msg(struct pcs_rdma_rw *rw, struct ib_device *dev,
+ enum dma_data_direction dir, u64 remote_addr, u32 rkey,
+ u32 local_dma_lkey, struct pcs_msg *msg, size_t offset,
+ size_t end_offset, gfp_t gfp, size_t sge_per_wr);
+void pcs_rdma_rw_destroy(struct pcs_rdma_rw *rw);
+
+struct pcs_rdma_mr
+{
+ struct list_head entry;
+
+ struct ib_device *dev;
+ struct ib_pd *pd;
+ enum dma_data_direction dir;
+ size_t size;
+ struct pcs_ib_mr_pool *ib_mr_pool;
+
+ struct pcs_sgl_buf sbuf;
+ struct ib_mr *mr;
+
+ struct ib_send_wr inv_wr;
+ struct ib_reg_wr reg_wr;
+
+ struct ib_send_wr *first_wr;
+ struct ib_send_wr *last_wr;
+};
+
+int pcs_rdma_mr_init_from_msg(struct pcs_rdma_mr *mr, struct ib_device *dev,
+ struct ib_pd *pd, enum dma_data_direction dir, struct pcs_msg *msg,
+ size_t offset, size_t end_offset, gfp_t gfp,
+ struct pcs_ib_mr_pool *ib_mr_pool);
+int pcs_rdma_mr_init(struct pcs_rdma_mr *mr, struct ib_device *dev,
+ struct ib_pd *pd, enum dma_data_direction dir, size_t size,
+ gfp_t gfp, struct pcs_ib_mr_pool *ib_mr_pool);
+void pcs_rdma_mr_destroy(struct pcs_rdma_mr *mr);
+
+struct pcs_rdma_mr* pcs_rdma_mr_alloc(struct ib_device *dev, struct ib_pd *pd,
+ enum dma_data_direction dir, size_t size, gfp_t gfp,
+ struct pcs_ib_mr_pool *ib_mr_pool);
+void pcs_rdma_mr_free(struct pcs_rdma_mr *mr);
+
+void pcs_rdma_mr_sync_for_cpu(struct pcs_rdma_mr *mr, size_t size);
+void pcs_rdma_mr_sync_for_device(struct pcs_rdma_mr *mr, size_t size);
+
+int pcs_rdma_mr_sync_for_msg(struct pcs_rdma_mr *mr, struct pcs_msg *msg,
+ size_t offset, size_t end_offset, bool to_msg);
+
+struct pcs_rdma_mr_pool
+{
+ size_t mr_size;
+ size_t mr_cnt;
+ struct ib_device *dev;
+ struct ib_pd *pd;
+ enum dma_data_direction dir;
+ gfp_t gfp;
+ struct pcs_ib_mr_pool *ib_mr_pool;
+
+ spinlock_t lock;
+
+ struct list_head mr_list;
+ size_t used_mrs;
+};
+
+int pcs_rdma_mr_pool_init(struct pcs_rdma_mr_pool *pool, size_t mr_size,
+ size_t mr_cnt, struct ib_device *dev, struct ib_pd *pd,
+ enum dma_data_direction dir, gfp_t gfp,
+ struct pcs_ib_mr_pool *ib_mr_pool);
+void pcs_rdma_mr_pool_destroy(struct pcs_rdma_mr_pool *pool);
+
+struct pcs_rdma_mr* pcs_rdma_mr_pool_get(struct pcs_rdma_mr_pool *pool);
+void pcs_rdma_mr_pool_put(struct pcs_rdma_mr_pool *pool,
+ struct pcs_rdma_mr *mr);
+
+struct pcs_rdma_msg
+{
+ struct pcs_msg *msg;
+ size_t offset;
+ size_t end_offset;
+
+ struct pcs_rdma_mr_pool *pool;
+ struct pcs_rdma_mr *mr;
+ struct pcs_rdma_mr _inline_mr;
+ void (*cleanup)(struct pcs_rdma_msg *rdma_msg);
+
+ u32 lkey;
+ u32 rkey;
+ u64 iova;
+
+ struct ib_send_wr *first_wr;
+ struct ib_send_wr *last_wr;
+};
+
+int pcs_rdma_msg_init(struct pcs_rdma_msg *rdma_msg, struct pcs_msg *msg,
+ size_t offset, size_t end_offset, struct pcs_rdma_mr_pool *pool,
+ bool try_to_map);
+void pcs_rdma_msg_destroy(struct pcs_rdma_msg *rdma_msg);
+
+#endif /* _PCS_RMDA_RW_H_ */
More information about the Devel
mailing list