[Devel] [PATCH RHEL7 COMMIT] fs/fuse kio: allocate memory for each RDMA RX buffer separately

Vasily Averin vvs at virtuozzo.com
Thu Aug 6 09:49:59 MSK 2020


The commit is pushed to "branch-rh7-3.10.0-1127.8.2.vz7.158.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-1127.8.2.vz7.158.4
------>
commit 253a21956feab917a2d170d7c6b541ec0847575f
Author: Ildar Ismagilov <Ildar.Ismagilov at acronis.com>
Date:   Thu Aug 6 09:49:59 2020 +0300

    fs/fuse kio: allocate memory for each RDMA RX buffer separately
    
    Currently we allocate memory for array of RDMA RX buffers as single
    physically contiguous chunk. High order allocations may fail if
    memory is fragmented. So allocate the memory for each RX buffer
    separately, because we don't need the array of RX buffers to be
    physically contiguous.
    
    Signed-off-by: Ildar Ismagilov <Ildar.Ismagilov at acronis.com>
    Acked-by: Alexey Kuznetsov <Alexey.Kuznetsov at acronis.com>
---
 fs/fuse/kio/pcs/pcs_rdma_io.c | 69 ++++++++++++++++++++++---------------------
 fs/fuse/kio/pcs/pcs_rdma_io.h |  4 +--
 2 files changed, 36 insertions(+), 37 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c
index 7b42cdc..dc55a9c 100644
--- a/fs/fuse/kio/pcs/pcs_rdma_io.c
+++ b/fs/fuse/kio/pcs/pcs_rdma_io.c
@@ -46,6 +46,9 @@ struct rio_rx {
 	struct rio_cqe cqe;
 	struct ib_sge sge;
 	struct ib_recv_wr wr;
+
+	char *buf;
+	dma_addr_t dma_addr;
 };
 
 enum {
@@ -121,19 +124,6 @@ static void rio_rx_done(struct rio_cqe *cqe, bool sync_mode);
 static void rio_tx_done(struct rio_cqe *cqe, bool sync_mode);
 static void rio_tx_err_occured(struct rio_cqe *cqe, bool sync_mode);
 
-/*
- * A trivial helper representing 1:1 mapping between
- * rio->rx_descs[RIO_N_RX] and rio->rx_bufs[queue_depth * RIO_MSG_SIZE]
- */
-static char *rx2buf(struct pcs_rdmaio *rio, struct rio_rx *rx)
-{
-	return rio->rx_bufs + RIO_MSG_SIZE * (rx - rio->rx_descs);
-}
-static dma_addr_t rx2dma(struct pcs_rdmaio *rio, struct rio_rx *rx)
-{
-	return rio->rx_bufs_dma + RIO_MSG_SIZE * (rx - rio->rx_descs);
-}
-
 /* Only called when rio->write_queue is not empty */
 static struct pcs_msg *rio_dequeue_msg(struct pcs_rdmaio *rio)
 {
@@ -227,6 +217,19 @@ static void rio_put_tx(struct pcs_rdma_device *dev, struct rio_tx *tx)
 	dev->free_txs_cnt++;
 }
 
+static bool rio_init_rx(struct rio_rx *rx, struct ib_device *dev)
+{
+	rx->buf = RE_NULL(ib_dma_alloc_coherent(dev, RIO_MSG_SIZE,
+						&rx->dma_addr,
+						GFP_NOIO | __GFP_NOWARN));
+	return rx->buf;
+}
+
+static void rio_fini_rx(struct rio_rx *rx, struct ib_device *dev)
+{
+	ib_dma_free_coherent(dev, RIO_MSG_SIZE, rx->buf, rx->dma_addr);
+}
+
 enum {
 	SUBMIT_REGULAR,
 	SUBMIT_NOOP,
@@ -278,7 +281,7 @@ static int rio_rx_post(struct pcs_rdmaio *rio, struct rio_rx *rx,
 
 	rx->rio = rio;
 
-	rx->sge.addr = rx2dma(rio, rx);
+	rx->sge.addr = rx->dma_addr;
 	rx->sge.length = length;
 	rx->sge.lkey = rio->dev->pd->local_dma_lkey;
 
@@ -886,7 +889,6 @@ static int rio_handle_rx_read_ack(struct pcs_rdmaio *rio,
 static void rio_handle_rx(struct pcs_rdmaio *rio, struct rio_rx *rx,
 			  enum ib_wc_status status, int pended)
 {
-	char *buf = rx2buf(rio, rx);
 	int ok = (status == IB_WC_SUCCESS) &&
 		 (rio->rio_state == RIO_STATE_ESTABLISHED);
 	char *payload = NULL;
@@ -903,7 +905,7 @@ static void rio_handle_rx(struct pcs_rdmaio *rio, struct rio_rx *rx,
 		return;
 	}
 
-	type = rio_parse_hdr(buf, &payload, &payload_size, &credits, &rb, &rack,
+	type = rio_parse_hdr(rx->buf, &payload, &payload_size, &credits, &rb, &rack,
 			     rio->queue_depth);
 
 	switch (type) {
@@ -1112,8 +1114,10 @@ static struct pcs_rdma_device *pcs_rdma_device_create(struct rdma_cm_id *cmid,
 	INIT_LIST_HEAD(&dev->free_txs);
 	for (i = 0; i < send_queue_depth; i++) {
 		tx = rio_alloc_tx(dev, TX_MSG_DONE);
-		if (!tx)
+		if (!tx) {
+			TRACE("rio_alloc_tx failed: dev: 0x%p\n", dev);
 			goto free_bufs;
+		}
 		rio_put_tx(dev, tx);
 	}
 
@@ -1185,11 +1189,10 @@ struct pcs_rdmaio* pcs_rdma_create(int hdr_size, struct rdma_cm_id *cmid,
 	int recv_queue_depth = queue_depth * 2 + 2;
 	int send_queue_depth = queue_depth * 4 + 4;
 	int rx_descs_siz = recv_queue_depth * sizeof(struct rio_rx);
-	int rx_bufs_siz = recv_queue_depth * RIO_MSG_SIZE;
 	static atomic_t comp_vector = ATOMIC_INIT(-1);
 	unsigned int cq_count = rdmaio_cq_count;
 	unsigned int cq_period = rdmaio_cq_period;
-	int max_recv_wr;
+	int max_recv_wr, i;
 
 	BUG_ON(!mutex_is_locked(&ep->mutex));
 
@@ -1246,13 +1249,13 @@ struct pcs_rdmaio* pcs_rdma_create(int hdr_size, struct rdma_cm_id *cmid,
 	if (!rio->rx_descs)
 		goto free_rio;
 
-	rio->rx_bufs_size = rx_bufs_siz;
-	rio->rx_bufs = RE_NULL(ib_dma_alloc_coherent(rio->cmid->device, rio->rx_bufs_size,
-						     &rio->rx_bufs_dma,
-						     GFP_NOIO | __GFP_NOWARN));
-	if (!rio->rx_bufs) {
-		TRACE("ib_dma_alloc_coherent failed: rio: 0x%p\n", rio);
-		goto free_desc;
+	rio->recv_queue_depth = 0;
+	for (i = 0; i < recv_queue_depth; i++) {
+		if (!rio_init_rx(rio->rx_descs + i, rio->cmid->device)) {
+			TRACE("rio_init_rx failed: rio: 0x%p\n", rio);
+			goto free_bufs;
+		}
+		rio->recv_queue_depth++;
 	}
 
 	rio->dev = pcs_rdma_device_create(rio->cmid, queue_depth, send_queue_depth);
@@ -1321,9 +1324,8 @@ free_cq:
 free_dev:
 	pcs_rdma_device_destroy(rio->dev);
 free_bufs:
-	ib_dma_free_coherent(rio->cmid->device, rio->rx_bufs_size,
-			     rio->rx_bufs, rio->rx_bufs_dma);
-free_desc:
+	for (i = 0; i < rio->recv_queue_depth; i++)
+		rio_fini_rx(rio->rx_descs + i, rio->cmid->device);
 	kfree(rio->rx_descs);
 free_rio:
 	pcs_rpc_put(rio->netio.parent);
@@ -1400,6 +1402,7 @@ static void rio_destroy(struct work_struct *work)
 
 	llist_for_each_entry_safe(rio, tmp, list, destroy_node) {
 		struct pcs_rpc *ep = rio->netio.parent;
+		int i;
 
 		mutex_lock(&ep->mutex);
 
@@ -1422,8 +1425,8 @@ static void rio_destroy(struct work_struct *work)
 		BUG_ON(rio->dev->free_txs_cnt != rio->send_queue_depth);
 
 		pcs_rdma_device_destroy(rio->dev);
-		ib_dma_free_coherent(rio->cmid->device, rio->rx_bufs_size,
-				     rio->rx_bufs, rio->rx_bufs_dma);
+		for (i = 0; i < rio->recv_queue_depth; i++)
+			rio_fini_rx(rio->rx_descs + i, rio->cmid->device);
 		kfree(rio->rx_descs);
 
 		rdma_destroy_id(rio->cmid);
@@ -1669,7 +1672,6 @@ static int pcs_rdma_sync_recv(struct pcs_netio *netio, struct pcs_msg **msg)
 	struct pcs_rpc *ep = netio->parent;
 	struct rio_rx *rx;
 	struct rio_cqe *cqe;
-	char *buf;
 	int   type;
 	char *payload;
 	int payload_size;
@@ -1698,8 +1700,7 @@ static int pcs_rdma_sync_recv(struct pcs_netio *netio, struct pcs_msg **msg)
 
 	rx = container_of(cqe, struct rio_rx, cqe);
 
-	buf = rx2buf(rio, rx);
-	type = rio_parse_hdr(buf, &payload, &payload_size, &credits, &rb, &rack,
+	type = rio_parse_hdr(rx->buf, &payload, &payload_size, &credits, &rb, &rack,
 			     rio->queue_depth);
 	if (type != RIO_MSG_IMMEDIATE || rb) {
 		ret = -EFAULT;
diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.h b/fs/fuse/kio/pcs/pcs_rdma_io.h
index 912be17..b411098e 100644
--- a/fs/fuse/kio/pcs/pcs_rdma_io.h
+++ b/fs/fuse/kio/pcs/pcs_rdma_io.h
@@ -57,6 +57,7 @@ struct pcs_rdmaio
 	 * blindly accepts the value. */
 	int queue_depth;
 	int send_queue_depth;
+	int recv_queue_depth;
 	int max_send_wr;
 
 	int send_timeout;
@@ -68,9 +69,6 @@ struct pcs_rdmaio
 	struct list_head tx_jobs; /* list head of TX jobs */
 
 	struct rio_rx *rx_descs; /* plain array of RX descriptors */
-	char *rx_bufs;           /* MR-ed area for payload of RXs */
-	size_t rx_bufs_size;
-	dma_addr_t rx_bufs_dma;
 	struct list_head pended_rxs; /* list head of pended RX frames */
 
 	int n_rx_posted; /* # posted RXs */


More information about the Devel mailing list