[Devel] [PATCH VZ9 8/10] fs/fuse/kio: crc calculation in pcs_cs_accel was wrong

Alexey Kuznetsov kuznet at virtuozzo.com
Fri Jan 17 21:09:18 MSK 2025


It did not test cached pages for uptodate status, which lead
to random crc failures, luckily they were not fatal, since
in the case of failure we just proceeded to user space for
details, yet it was annoying and suboptimal. Also, it was
more suboptimal with allocation memory for csum arrays
when we could use direct references to page cache.

Also, copy our own crc32c implementation rom user space.
Kernel mainstream one is some dorkish crock, it is 50% slower than ours
and looking forward to future mainstream improvements it will be broken
even further. So, let us have the option to keep our proven stable
and fast implementation.

Signed-off-by: Alexey Kuznetsov <kuznet at virtuozzo.com>
---
 fs/fuse/Makefile                   |   1 +
 fs/fuse/kio/pcs/pcs_crc32c.c       | 310 +++++++++++++++++++++++++++++++++++++
 fs/fuse/kio/pcs/pcs_crc32c.h       |  13 ++
 fs/fuse/kio/pcs/pcs_cs_accel.c     | 162 ++++++++++++++++---
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |   3 +
 fs/fuse/kio/pcs/pcs_req.h          |   5 +-
 6 files changed, 470 insertions(+), 24 deletions(-)
 create mode 100644 fs/fuse/kio/pcs/pcs_crc32c.c
 create mode 100644 fs/fuse/kio/pcs/pcs_crc32c.h

diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index 18eaa35..647ca8f 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -36,6 +36,7 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
 	kio/pcs/pcs_cs_accel.o \
 	kio/pcs/pcs_rpc_clnt.o \
 	kio/pcs/pcs_mr.o \
+	kio/pcs/pcs_crc32c.o \
 	kio/pcs/pcs_krpc.o
 
 fuse_kio_pcs_trace-objs := kio/pcs/fuse_kio_pcs_trace.o
diff --git a/fs/fuse/kio/pcs/pcs_crc32c.c b/fs/fuse/kio/pcs/pcs_crc32c.c
new file mode 100644
index 0000000..3755f47
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_crc32c.c
@@ -0,0 +1,310 @@
+/*
+ * Copyright б╘ 2003-2018 Acronis International GmbH.
+ */
+
+#include <linux/types.h>
+#include <linux/string.h>
+#include <linux/module.h>
+#include <asm/cpufeatures.h>
+#include <asm/cpu_device_id.h>
+
+#include "pcs_crc32c.h"
+
+int pcs_crc_may_inline = -1;
+
+static int pcs_crc32c_initialized;
+
+#ifdef CONFIG_X86_64
+
+/*
+ * This is Intel SSE4.2 optimized implementation of CRC32, spending 3 cycles on 8 bytes
+ * and achieveing ~3GB/sec on practice.
+ * http://download.intel.com/design/intarch/papers/323405.pdf
+ */
+
+#define sse_crc32_u8(crc, bptr)	asm volatile("crc32b %2,%1" : "=r"(crc) : \
+"0"(crc), "r"(*(unsigned char *)(bptr)))
+#define sse_crc32_u64(crc, lptr)	asm volatile("crc32q %2,%1" : "=r"(crc) : \
+"0"(crc), "r"(*(uint64_t *)(lptr)))
+
+/* ------------------ taken from original crc32_hw1.c SMHasher -------------------- */
+/* Compile with gcc -O3 -msse4.2 ... */
+
+/* crc32c.c -- compute CRC-32C using the Intel crc32 instruction
+ * Copyright (C) 2013 Mark Adler
+ * Version 1.1  1 Aug 2013  Mark Adler
+ */
+
+/*
+ * This software is provided 'as-is', without any express or implied
+ * warranty.  In no event will the author be held liable for any damages
+ * arising from the use of this software.
+ *
+ * Permission is granted to anyone to use this software for any purpose,
+ * including commercial applications, and to alter it and redistribute it
+ * freely, subject to the following restrictions:
+ *
+ * 1. The origin of this software must not be misrepresented; you must not
+ * claim that you wrote the original software. If you use this software
+ * in a product, an acknowledgment in the product documentation would be
+ * appreciated but is not required.
+ * 2. Altered source versions must be plainly marked as such, and must not be
+ * misrepresented as being the original software.
+ * 3. This notice may not be removed or altered from any source distribution.
+ *
+ * Mark Adler
+ * madler at alumni.caltech.edu
+ */
+
+/* Use hardware CRC instruction on Intel SSE 4.2 processors.  This computes a
+ * CRC-32C, *not* the CRC-32 used by Ethernet and zip, gzip, etc.
+ */
+
+/* Version history:
+ * 1.0  10 Feb 2013  First version
+ * 1.1   1 Aug 2013  Correct comments on why three crc instructions in parallel
+ */
+
+/* CRC-32C (iSCSI) polynomial in reversed bit order. */
+#define POLY 0x82f63b78
+
+/* Block sizes for three-way parallel crc computation. */
+//#define LONG	(1024 * 8)
+#define SHORT	((512 / 4 / 8) * 8)
+
+/* Tables for hardware crc that shift a crc by LONG and SHORT zeros. */
+#ifdef LONG
+static uint32_t crc32c_long[4][256];
+#endif
+#ifdef SHORT
+static u32 crc32c_short[4][256];
+#endif
+
+/* Multiply a matrix times a vector over the Galois field of two elements,
+ * GF(2).  Each element is a bit in an unsigned integer.  mat must have at
+ * least as many entries as the power of two for most significant one bit in
+ * vec.
+ */
+static inline uint32_t gf2_matrix_mul_scalar(const uint32_t *mat, uint32_t vec)
+{
+	uint32_t sum = 0;
+
+	while (vec) {
+		if (vec & 1)
+			sum ^= *mat;
+		vec >>= 1;
+		mat++;
+	}
+	return sum;
+}
+
+/* Multiply a matrix by itself over GF(2).  Both mat and square must have 32
+ * rows.
+ */
+static inline void gf2_matrix_mul(uint32_t *dst, const uint32_t *src)
+{
+	uint32_t tmp[32];
+	int n;
+
+	for (n = 0; n < 32; n++)
+		tmp[n] = gf2_matrix_mul_scalar(dst, src[n]);
+	memcpy(dst, tmp, sizeof(tmp));
+}
+
+/* Calculate power pwr of matrix mat */
+static void gf2_matrix_power(uint32_t *mat, size_t pwr)
+{
+	uint32_t tmp[32];
+	int n;
+
+	for (n = 0; n < 32; n++)
+		tmp[n] = 1 << n;
+
+	while (pwr > 1) {
+		if (pwr & 1)
+			gf2_matrix_mul(tmp, mat);
+		gf2_matrix_mul(mat, mat);
+		pwr >>= 1;
+	}
+	gf2_matrix_mul(mat, tmp);
+}
+
+/* Take a length and build four lookup tables for applying the zeros operator
+ * for that length, byte-by-byte on the operand.
+ */
+static void crc32c_zeros(uint32_t zeros[][256], size_t len)
+{
+	uint32_t n;
+	uint32_t op[32];
+
+	op[0] = POLY;	/* CRC-32C polynomial */
+	for (n = 1; n < 32; n++)
+		op[n] = 1 << (n - 1);
+
+	gf2_matrix_power(op, len * 8);
+
+	for (n = 0; n < 256; n++) {
+		zeros[0][n] = gf2_matrix_mul_scalar(op, n);
+		zeros[1][n] = gf2_matrix_mul_scalar(op, n << 8);
+		zeros[2][n] = gf2_matrix_mul_scalar(op, n << 16);
+		zeros[3][n] = gf2_matrix_mul_scalar(op, n << 24);
+	}
+}
+
+/* Apply the zeros operator table to crc. */
+static inline uint32_t crc32c_shift(uint32_t zeros[][256], uint32_t crc)
+{
+	return zeros[0][crc & 0xff] ^ zeros[1][(crc >> 8) & 0xff] ^
+		zeros[2][(crc >> 16) & 0xff] ^ zeros[3][crc >> 24];
+}
+
+/* Initialize tables for shifting crcs. */
+void __init pcs_crc32c_init_hw(void)
+{
+	static const struct x86_cpu_id crc32c_cpu_id[] = {
+		X86_MATCH_FEATURE(X86_FEATURE_XMM4_2, NULL),
+	{}
+	};
+
+	if (!x86_match_cpu(crc32c_cpu_id)) {
+		pcs_crc_may_inline = 0;
+		return;
+	}
+
+#ifdef LONG
+	crc32c_zeros(crc32c_long, LONG);
+#endif
+#ifdef SHORT
+	crc32c_zeros(crc32c_short, SHORT);
+#endif
+	pcs_crc32c_initialized = 1;
+	if (pcs_crc_may_inline < 0)
+		pcs_crc_may_inline = 1;
+}
+
+/* Compute CRC-32C using the Intel hardware instruction. */
+unsigned int pcs_crc32up_sse(unsigned int crc, const unsigned char *buf, unsigned int len)
+{
+	const unsigned char *next = buf;
+	unsigned int i;
+	uint64_t crc0, crc1, crc2, crc3;      /* need to be 64 bits for crc32q */
+
+	/* pre-process the crc */
+	crc0 = crc;
+
+	/* compute the crc for up to seven leading bytes to bring the data pointer
+	 * to an eight-byte boundary
+	 */
+	while (len && ((uintptr_t)next & 7) != 0) {
+		sse_crc32_u8(crc0, next);
+		next++;
+		len--;
+	}
+
+#ifdef LONG
+	/* compute the crc on sets of LONG*4 bytes, executing three independent crc
+	 * instructions, each on LONG bytes -- this is optimized for the Nehalem,
+	 * Westmere, Sandy Bridge, and Ivy Bridge architectures, which have a
+	 * throughput of one crc per cycle, but a latency of three cycles
+	 */
+	while (len >= LONG*4) {
+		crc1 = 0;
+		crc2 = 0;
+		crc3 = 0;
+		for (i = 0; i < LONG / 8; i++) {
+			sse_crc32_u64(crc0, next);
+			sse_crc32_u64(crc1, next + LONG);
+			sse_crc32_u64(crc2, next + LONG * 2);
+			sse_crc32_u64(crc3, next + LONG * 3);
+			next += 8;
+		}
+		crc0 = crc32c_shift(crc32c_long, (uint32_t)crc0) ^ crc1;
+		crc0 = crc32c_shift(crc32c_long, (uint32_t)crc0) ^ crc2;
+		crc0 = crc32c_shift(crc32c_long, (uint32_t)crc0) ^ crc3;
+		next += LONG*3;
+		len -= LONG*4;
+	}
+#endif
+
+#ifdef SHORT
+	/* do the same thing, but now on SHORT*4 blocks for the remaining data less
+	 * than a LONG*4 block
+	 */
+	while (len >= SHORT*4) {
+		crc1 = 0;
+		crc2 = 0;
+		crc3 = 0;
+		for (i = 0; i < SHORT / 8; i++) {
+			sse_crc32_u64(crc0, next);
+			sse_crc32_u64(crc1, next + SHORT);
+			sse_crc32_u64(crc2, next + SHORT * 2);
+			sse_crc32_u64(crc3, next + SHORT * 3);
+			next += 8;
+		}
+		crc0 = crc32c_shift(crc32c_short, (uint32_t)crc0) ^ crc1;
+		crc0 = crc32c_shift(crc32c_short, (uint32_t)crc0) ^ crc2;
+		crc0 = crc32c_shift(crc32c_short, (uint32_t)crc0) ^ crc3;
+		next += SHORT*3;
+		len -= SHORT*4;
+	}
+#endif
+
+	/* compute the crc on the remaining eight-byte units less than a SHORT*4
+	 * block
+	 */
+	while (len >= 8) {
+		sse_crc32_u64(crc0, next);
+		next += 8;
+		len -= 8;
+	}
+
+	/* compute the crc for up to seven trailing bytes */
+	while (len) {
+		sse_crc32_u8(crc0, next);
+		next++;
+		len--;
+	}
+
+	/* return a post-processed crc */
+	return (unsigned int)(crc0);
+}
+
+static int pcs_crc_may_inline_set(const char *val,
+				  const struct kernel_param *kp)
+{
+	unsigned long result;
+	int ret;
+
+	if (!pcs_crc32c_initialized)
+		return -EPERM;
+
+	ret = kstrtoul(val, 0, &result);
+	if (ret)
+		return ret;
+
+	WRITE_ONCE(pcs_crc_may_inline, !!result);
+	return 0;
+}
+
+static const struct kernel_param_ops pcs_crc_may_inline_ops = {
+	.set	= pcs_crc_may_inline_set,
+	.get	= param_get_int,
+};
+
+module_param_cb(pcs_crc_may_inline, &pcs_crc_may_inline_ops, &pcs_crc_may_inline,
+		0644);
+MODULE_PARM_DESC(pcs_crc_may_inline, "Avoid to use system libcrypto crc");
+
+#else /* CONFIG_X86_64 */
+
+void __init pcs_crc32c_init_hw(void)
+{
+	pcs_crc_may_inline = 0;
+}
+
+unsigned int pcs_crc32up_sse(unsigned int crc, const unsigned char *buf, unsigned int len)
+{
+	BUG();
+}
+
+#endif /* CONFIG_X86_64 */
diff --git a/fs/fuse/kio/pcs/pcs_crc32c.h b/fs/fuse/kio/pcs/pcs_crc32c.h
new file mode 100644
index 0000000..7682538
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_crc32c.h
@@ -0,0 +1,13 @@
+/*
+ * Copyright б╘ 2003-2018 Acronis International GmbH.
+ */
+
+#ifndef __CRC32_H__
+#define __CRC32_H__
+
+extern int pcs_crc_may_inline;
+
+unsigned int pcs_crc32up_sse(unsigned int crc, const unsigned char *s, unsigned int len);
+void pcs_crc32c_init_hw(void);
+
+#endif /* __CRC32_H__ */
diff --git a/fs/fuse/kio/pcs/pcs_cs_accel.c b/fs/fuse/kio/pcs/pcs_cs_accel.c
index 6e8efae..de39ecf 100644
--- a/fs/fuse/kio/pcs/pcs_cs_accel.c
+++ b/fs/fuse/kio/pcs/pcs_cs_accel.c
@@ -21,6 +21,8 @@
 #include "pcs_cluster.h"
 #include "log.h"
 #include "fuse_ktrace.h"
+#include "pcs_krpc.h"
+#include "pcs_crc32c.h"
 
 static unsigned int crc_verify = 1;
 module_param(crc_verify, uint, 0644);
@@ -234,23 +236,24 @@ static int csa_update(struct pcs_csa_context * ctx, PCS_CHUNK_UID_T chunk_id, u3
 	return 0;
 }
 
-static int verify_crc(struct pcs_int_request * ireq, u32 * crc)
+static int verify_crc(struct pcs_int_request *ireq, unsigned int size, u32 *crc)
 {
 	struct iov_iter * it = &ireq->iochunk.ar.iter;
-	unsigned int size = ireq->iochunk.size;
 	char crc_desc[sizeof(struct shash_desc) + 4] __aligned(__alignof__(struct shash_desc));
 	struct shash_desc *shash = (struct shash_desc *)crc_desc;
+	int use_shash = (READ_ONCE(pcs_crc_may_inline) <= 0);
 	int i;
 
 	shash->tfm = crc_tfm;
 
-	iov_iter_revert(it, size);
-
 	for (i = 0; i < size/4096; i++) {
 		unsigned int left = 4096;
 		u32 ccrc;
 
-		*(u32*)shash->__ctx = ~0U;
+		if (unlikely(use_shash))
+			*(u32 *)shash->__ctx = ~0U;
+		else
+			ccrc = ~0U;
 
 		do {
 			size_t offset;
@@ -260,13 +263,20 @@ static int verify_crc(struct pcs_int_request * ireq, u32 * crc)
 			len = iov_iter_get_pages2(it, &page, left, 1, &offset);
 			BUG_ON(len <= 0);
 
-			crypto_shash_alg(crc_tfm)->update(shash, kmap(page) + offset, len);
+			if (unlikely(use_shash))
+				crypto_shash_alg(crc_tfm)->update(shash, kmap(page) + offset, len);
+			else
+				ccrc = pcs_crc32up_sse(ccrc, kmap(page) + offset, len);
+
 			kunmap(page);
 			put_page(page);
 			left -= len;
 		} while (left > 0);
 
-		crypto_shash_alg(crc_tfm)->final(shash, (u8*)&ccrc);
+		if (unlikely(use_shash))
+			crypto_shash_alg(crc_tfm)->final(shash, (u8 *)&ccrc);
+		else
+			ccrc = ~ccrc;
 
 		if (ccrc != crc[i]) {
 			FUSE_KTRACE(ireq->cc->fc, "CRC error pg=%d@%u %08x %08x\n", i,
@@ -381,18 +391,67 @@ static void __pcs_csa_final_completion(struct pcs_aio_req *areq)
 		rcu_read_unlock();
 	}
 
+	ireq->crc_cpu = smp_processor_id();
+
 	if (areq->crc) {
 		if (!pcs_if_error(&ireq->error)) {
-			if (verify_crc(ireq, areq->crc))
-				pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
+			unsigned int sz = ireq->iochunk.size;
+
+			iov_iter_revert(&ireq->iochunk.ar.iter, sz);
+			if (verify_crc(ireq, sz, areq->crc))
+				pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1,
+							  ireq->iochunk.csl ?
+							  ireq->iochunk.csl->
+							  cs[ireq->iochunk.cs_index].info.id :
+							  (PCS_NODE_ID_T) {0});
 		}
 
 		if (areq->crc && areq->crc != areq->crcb) {
 			kfree(areq->crc);
 			areq->crc = NULL;
 		}
+	} else if (areq->crc_page[0]) {
+		if (!pcs_if_error(&ireq->error)) {
+			unsigned int sz = ireq->iochunk.size;
+			unsigned int off = (ireq->iochunk.offset / 4096) * 4;
+
+			iov_iter_revert(&ireq->iochunk.ar.iter, sz);
+
+			off &= (PAGE_SIZE - 1);
+			if (sz > (PAGE_SIZE - off)*(PAGE_SIZE / 4))
+				sz = (PAGE_SIZE - off)*(PAGE_SIZE / 4);
+
+			if (verify_crc(ireq, sz, kmap(areq->crc_page[0]) + off))
+				pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1,
+							  ireq->iochunk.csl ?
+							  ireq->iochunk.csl->
+							  cs[ireq->iochunk.cs_index].info.id :
+							  (PCS_NODE_ID_T) {0});
+			kunmap(areq->crc_page[0]);
+
+			sz = ireq->iochunk.size - sz;
+			if (sz) {
+				if (verify_crc(ireq, sz, kmap(areq->crc_page[1])))
+					pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1,
+								  ireq->iochunk.csl ?
+								  ireq->iochunk.csl->
+								  cs[ireq->iochunk.cs_index].info.id :
+								  (PCS_NODE_ID_T) {0});
+				kunmap(areq->crc_page[1]);
+			}
+		}
+
+		put_page(areq->crc_page[0]);
+		areq->crc_page[0] = NULL;
+		if (areq->crc_page[1]) {
+			put_page(areq->crc_page[1]);
+			areq->crc_page[1] = NULL;
+		}
 	}
 
+	if (!ireq->iochunk.csl)
+		goto out;
+
 	if (!pcs_if_error(&ireq->error)) {
 		struct fuse_conn * fc = ireq->cc->fc;
 
@@ -401,10 +460,15 @@ static void __pcs_csa_final_completion(struct pcs_aio_req *areq)
 			struct fuse_trace_hdr * t;
 
 			t = FUSE_TRACE_PREPARE(fc->ktrace, FUSE_KTRACE_IOTIMES, sizeof(struct fuse_tr_iotimes_hdr) +
-					       sizeof(struct fuse_tr_iotimes_cs));
+					       sizeof(struct fuse_tr_iotimes_cs) +
+					       sizeof(struct fuse_tr_attr) +
+					       sizeof(struct fuse_tr_iotimes_aux));
 			if (t) {
 				struct fuse_tr_iotimes_hdr * th = (struct fuse_tr_iotimes_hdr *)(t + 1);
 				struct fuse_tr_iotimes_cs * ch = (struct fuse_tr_iotimes_cs *)(th + 1);
+				struct fuse_tr_attr *aah = (struct fuse_tr_attr *)(ch + 1);
+				struct fuse_tr_iotimes_aux *ah =
+					(struct fuse_tr_iotimes_aux *)(aah + 1);
 
 				th->chunk = ireq->iochunk.chunk;
 				th->offset = ireq->iochunk.chunk + ireq->iochunk.offset;
@@ -415,13 +479,24 @@ static void __pcs_csa_final_completion(struct pcs_aio_req *areq)
 				th->ino = ireq->dentry->fileinfo.attr.id;
 				th->type = PCS_CS_READ_RESP;
 				th->cses = 1;
-				th->__pad = 0;
+				th->__pad = ((!!(ireq->flags & IREQ_F_REQUEUED)) << 7) |
+						smp_processor_id();
 				th->chid = (unsigned int)ireq->iochunk.map->id;
 
 				ch->csid = ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id.val | PCS_NODE_ALT_MASK;
 				ch->misc = ktime_to_us(ireq->ts_sent);
 				ch->ts_net = 0;
 				ch->ts_io = th->lat;
+
+				aah->magic = FUSE_TR_ATTR_MAGIC;
+				aah->type = FUSE_TR_ATTR_AUX;
+				aah->len = sizeof(struct fuse_tr_iotimes_aux);
+
+				ah->ts_dio = ktime_to_us(ktime_sub(ireq->ts_dio, ireq->ts_sent));
+				ah->cpus[0] = ireq->submit_cpu;
+				ah->cpus[1] = ireq->compl_cpu;
+				ah->cpus[2] = ireq->crcr_cpu;
+				ah->cpus[3] = ireq->crc_cpu;
 			}
 			FUSE_TRACE_COMMIT(fc->ktrace);
 		}
@@ -439,6 +514,7 @@ static void __pcs_csa_final_completion(struct pcs_aio_req *areq)
 		ireq->iochunk.msg.rpc = NULL;
 	}
 
+out:
 	ireq_complete(ireq);
 }
 
@@ -455,18 +531,39 @@ static inline int quick_crc_fetch(struct pcs_int_request * ireq, struct file * c
 	pgoff_t idx = offset / PAGE_SIZE;
 	struct page * page;
 
-	if (idx != ((offset + sz - 1) / PAGE_SIZE) || sz > sizeof(ireq->iochunk.ar.crcb))
+	if (idx + 1 < ((offset + sz - 1) / PAGE_SIZE))
 		return 0;
 
 	page = find_get_page(cfile->f_mapping, idx);
 	if (!page)
 		return 0;
 
-	memcpy(ireq->iochunk.ar.crcb, kmap(page) + (offset & (PAGE_SIZE-1)), sz);
-	ireq->iochunk.ar.crc = ireq->iochunk.ar.crcb;
-	kunmap(page);
-	put_page(page);
+	if (!PageUptodate(page)) {
+		put_page(page);
+		return 0;
+	}
+
+	ireq->iochunk.ar.crc_page[0] = page;
+
+	if (idx < ((offset + sz - 1) / PAGE_SIZE)) {
+		page = find_get_page(cfile->f_mapping, idx + 1);
+		if (!page)
+			goto dropout;
+
+		if (!PageUptodate(page)) {
+			put_page(page);
+			goto dropout;
+		}
+
+		ireq->iochunk.ar.crc_page[1] = page;
+	}
+
 	return 1;
+
+dropout:
+	put_page(ireq->iochunk.ar.crc_page[0]);
+	ireq->iochunk.ar.crc_page[0] = NULL;
+	return 0;
 }
 
 static void csa_crc_work(struct work_struct *w)
@@ -483,7 +580,11 @@ static void csa_crc_work(struct work_struct *w)
 		areq->crc = kmalloc(ncrc, GFP_KERNEL);
 		if (areq->crc == NULL) {
 out:
-			pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_NORES, 1, ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
+			pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_NORES, 1,
+						  ireq->iochunk.csl ?
+						  ireq->iochunk.csl->
+						  cs[ireq->iochunk.cs_index].info.id :
+						  (PCS_NODE_ID_T) {0});
 			fput(areq->cfile);
 			if (areq->crc && areq->crc != areq->crcb) {
 				kfree(areq->crc);
@@ -494,12 +595,18 @@ static void csa_crc_work(struct work_struct *w)
 		}
 	}
 
+	ireq->crcr_cpu = smp_processor_id();
+
 	pos = (ireq->iochunk.offset / 4096) * 4;
+	/* No acceptable interface to read pages from file.
+	 * It is ridiculous but the best thing kernel suggests is damn splice
+	 */
 	sz = kernel_read(areq->cfile, areq->crc, ncrc, &pos);
 	if (sz != ncrc) {
 		FUSE_KTRACE(ireq->cc->fc, "Did not read crc res=%u expected=%u", (unsigned)sz, (unsigned)ncrc);
 		goto out;
 	}
+	FUSE_KDTRACE(ireq->cc->fc, "Read crc page %08lx at %llu", (unsigned long)areq->cfile, pos);
 	fput(areq->cfile);
 	areq->cfile = NULL;
 	pcs_csa_do_completion(areq);
@@ -519,9 +626,15 @@ static void pcs_csa_complete(struct kiocb *iocb, long ret)
 
 	areq = container_of(iocb, struct pcs_aio_req, iocb);
 	ireq = container_of(areq, struct pcs_int_request, iochunk.ar);
+	ireq->compl_cpu = smp_processor_id();
+	ireq->ts_dio = ktime_get();
 
 	if (ret != ireq->iochunk.size)
-		pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1, ireq->iochunk.csl->cs[ireq->iochunk.cs_index].info.id);
+		pcs_set_error_cond_atomic(&ireq->error, PCS_ERR_IO, 1,
+					  ireq->iochunk.csl ?
+					  ireq->iochunk.csl->
+					  cs[ireq->iochunk.cs_index].info.id :
+					  (PCS_NODE_ID_T) {0});
 
 	if (atomic_dec_and_test(&areq->iocount)) {
 		INIT_WORK(&areq->work, csa_complete_work);
@@ -541,6 +654,9 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 
 	areq->cfile = NULL;
 	areq->crc = NULL;
+	areq->crc_page[0] = NULL;
+	areq->crc_page[1] = NULL;
+	ireq->crcr_cpu = 0xFF;
 
 	if (do_csum) {
 		if (cfile == NULL)
@@ -552,7 +668,6 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 		quick_crc_fetch(ireq, cfile);
 	}
 
-	BUG_ON(parent->type != PCS_IREQ_API);
 	ar = parent->apireq.req;
 
 	ar->get_iter(ar->datasource, ireq->iochunk.dio_offset, it, READ);
@@ -572,6 +687,7 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 	/* One ref is ours, other is for AIO. If crc read is needed we will grab the third */
 	atomic_set(&areq->iocount, 2);
 
+	ireq->submit_cpu = smp_processor_id();
 	ret = call_read_iter(file, iocb, it);
 
 	if (unlikely(ret != -EIOCBQUEUED)) {
@@ -597,8 +713,9 @@ static inline int csa_submit(struct file * file, struct file *cfile, int do_csum
 	}
 
 	/* Successful or queued read. Need to start crc read, if it is not ready already */
-	if (do_csum && !areq->crc) {
-		FUSE_KTRACE(ireq->cc->fc, "Not a quicky crc");
+	if (do_csum && !areq->crc_page[0]) {
+		FUSE_KDTRACE(ireq->cc->fc, "Not a quicky crc %lu at %u",
+		(unsigned long)cfile, (unsigned int)ireq->iochunk.offset);
 		INIT_WORK(&areq->work, csa_crc_work);
 		/* Grab ref for crc read work */
 		atomic_inc(&areq->iocount);
@@ -781,7 +898,8 @@ static void __complete_acr_work(struct work_struct * w)
 				th->ino = ireq->dentry->fileinfo.attr.id;
 				th->type = PCS_CS_WRITE_AL_RESP;
 				th->cses = n;
-				th->__pad = 0;
+				th->__pad = ((!!(ireq->flags & IREQ_F_REQUEUED)) << 7) |
+				 smp_processor_id();
 				th->chid = (unsigned int)ireq->iochunk.map->id;
 
 				for (i = 0; i < n; i++, ch++)
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 7e5d3d4..0aeb166 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -39,6 +39,7 @@
 #include "pcs_net_addr.h"
 #include "pcs_mr.h"
 #include "pcs_krpc.h"
+#include "pcs_crc32c.h"
 
 unsigned int pcs_loglevel = LOG_TRACE;
 module_param(pcs_loglevel, uint, 0644);
@@ -2083,6 +2084,8 @@ static int __init kpcs_mod_init(void)
 
 	fuse_trace_root = debugfs_create_dir("fuse", NULL);
 
+	pcs_crc32c_init_hw();
+
 	crc_tfm = crypto_alloc_shash("crc32c-intel", 0, 0);
 	if (!crc_tfm || IS_ERR(crc_tfm))
 		crc_tfm = crypto_alloc_shash("crc32c", 0, 0);
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index 1503e59..09eeb9d 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -21,8 +21,6 @@
 #include "pcs_mr.h"
 #include "pcs_krpc.h"
 
-///////////////////////////
-
 enum
 {
 	PCS_IREQ_API	= 0,	/* IO request from API */
@@ -57,6 +55,7 @@ struct pcs_aio_req
 	struct work_struct	work;
 
 	u32    			*crc;
+	struct page		*crc_page[2];
 	struct file		*cfile;
 #define PCS_MAX_INLINE_CRC	32
 	u32    			crcb[PCS_MAX_INLINE_CRC];
@@ -138,6 +137,8 @@ struct pcs_int_request
 	int			qdepth;
 	ktime_t			ts;
 	ktime_t			ts_sent;
+	ktime_t			ts_dio;
+	u8			submit_cpu, compl_cpu, crcr_cpu, crc_cpu;
 	PCS_NODE_ID_T		wait_origin;
 
 	struct {
-- 
1.8.3.1



More information about the Devel mailing list