[Devel] [PATCH RHEL8 COMMIT] fs/fuse kio: implement internal cs connection

Konstantin Khorenko khorenko at virtuozzo.com
Thu Oct 15 10:37:33 MSK 2020


The commit is pushed to "branch-rh8-4.18.0-193.6.3.vz8.4.x-ovz" and will appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh8-4.18.0-193.6.3.vz8.4.13
------>
commit 2832a81ede0cc22ffaeafbc4e8f262cb18fc3862
Author: Pavel Butsykin <pbutsykin at virtuozzo.com>
Date:   Thu Oct 15 10:37:33 2020 +0300

    fs/fuse kio: implement internal cs connection
    
    This patch transfers the creation of sock connection to CS from user-space
    to kernel, in order to implement RDMA in Fast-Path. Together with the creation
    of connection, this patch transfers the 'auth digest' handshake, which is
    necessary to identify the client.
    
    In contrast to usermode, the hendshake in this patch is implemented
    synchronously, which made it possible to reduce and simplify the code.
    
    https://pmc.acronis.com/browse/VSTOR-4310
    
    Signed-off-by: Pavel Butsykin <pbutsykin at virtuozzo.com>
    
    =======================================================
    Patchset description:
    fs/fuse kio: Add RDMA support
    
    This patch set adds support RDMA transport for kdirect PCS io engine
    
    Ildar Ismagilov (3):
      fs/fuse kio: make pcs rpc socket independent
      fs/fuse kio: make pcs auth socket independent
      fs/fuse kio: implement support RDMA transport
    
    Pavel Butsykin (2):
      fs/fuse kio: implement internal cs connection
      fs/fuse kio: drop external cs connection
---
 fs/fuse/Makefile                   |   3 +-
 fs/fuse/kio/pcs/pcs_cluster.c      |   9 +-
 fs/fuse/kio/pcs/pcs_cluster.h      |   4 +-
 fs/fuse/kio/pcs/pcs_cluster_core.c |   7 +-
 fs/fuse/kio/pcs/pcs_cs.c           |  47 ++-
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c |   2 +-
 fs/fuse/kio/pcs/pcs_ioctl.h        |   3 +-
 fs/fuse/kio/pcs/pcs_req.h          |   2 +
 fs/fuse/kio/pcs/pcs_rpc.c          |   8 +-
 fs/fuse/kio/pcs/pcs_rpc.h          |  16 +
 fs/fuse/kio/pcs/pcs_rpc_prot.h     |  21 ++
 fs/fuse/kio/pcs/pcs_sock_conn.c    | 672 +++++++++++++++++++++++++++++++++++++
 fs/fuse/kio/pcs/pcs_sock_conn.h    |   7 +
 fs/fuse/kio/pcs/pcs_sock_io.c      |  47 ++-
 fs/fuse/kio/pcs/pcs_sock_io.h      |   9 +
 fs/fuse/kio/pcs/pcs_types.h        |   1 +
 16 files changed, 821 insertions(+), 37 deletions(-)

diff --git a/fs/fuse/Makefile b/fs/fuse/Makefile
index d69ad2c8176d..1dd2d73d48f6 100644
--- a/fs/fuse/Makefile
+++ b/fs/fuse/Makefile
@@ -26,7 +26,8 @@ fuse_kio_pcs-objs := kio/pcs/pcs_fuse_kdirect.o \
 	kio/pcs/pcs_cluster_core.o \
 	kio/pcs/pcs_cs.o \
 	kio/pcs/fuse_io.o \
-	kio/pcs/fuse_stat.o
+	kio/pcs/fuse_stat.o \
+	kio/pcs/pcs_sock_conn.o
 
 fuse-objs := dev.o dir.o file.o inode.o control.o xattr.o acl.o readdir.o
 virtiofs-y += virtio_fs.o
diff --git a/fs/fuse/kio/pcs/pcs_cluster.c b/fs/fuse/kio/pcs/pcs_cluster.c
index 65d9288cb88d..08f1a7dd6d51 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.c
+++ b/fs/fuse/kio/pcs/pcs_cluster.c
@@ -588,19 +588,18 @@ static int ireq_check_redo_(struct pcs_int_request *ireq)
 }
 
 int pcs_cluster_init(struct pcs_fuse_cluster *pfc, struct workqueue_struct *wq,
-		     struct fuse_conn *fc, PCS_CLUSTER_ID_T *cl_id,
-		     PCS_NODE_ID_T *id)
+		     struct fuse_conn *fc, struct pcs_ioc_init_kdirect *info)
 {
 	struct pcs_cluster_core_attr attr;
 
-	attr.cluster = *cl_id;
-	attr.node = *id;
+	attr.cluster = info->cluster_id;
+	attr.node = info->node_id;
 	attr.abort_timeout_ms = 0;
 
 	pfc->fc = fc;
 
 	/* core init */
-	if (pcs_cc_init(&pfc->cc, wq, NULL, &attr))
+	if (pcs_cc_init(&pfc->cc, wq, info->cluster_name, &attr))
 		return -1;
 	pfc->cc.fc = fc;
 	pfc->cc.op.ireq_process	   = ireq_process_;
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index de9897e737c4..65b34f8b657f 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -2,6 +2,7 @@
 #define _PCS_CLUSTER_H_ 1
 
 #include "pcs_req.h"
+#include "pcs_ioctl.h"
 #include "../../fuse_i.h"
 struct fuse_conn;
 
@@ -55,8 +56,7 @@ struct pcs_fuse_work {
 extern struct workqueue_struct *pcs_cleanup_wq;
 
 int pcs_cluster_init(struct pcs_fuse_cluster *c, struct workqueue_struct *,
-		     struct fuse_conn *fc, PCS_CLUSTER_ID_T *cl_id,
-		     PCS_NODE_ID_T *id);
+		     struct fuse_conn *fc, struct pcs_ioc_init_kdirect *info);
 void pcs_cluster_fini(struct pcs_fuse_cluster *c);
 
 extern void fiemap_work_func(struct work_struct *w);
diff --git a/fs/fuse/kio/pcs/pcs_cluster_core.c b/fs/fuse/kio/pcs/pcs_cluster_core.c
index 010d6588c99b..180e5f726de5 100644
--- a/fs/fuse/kio/pcs/pcs_cluster_core.c
+++ b/fs/fuse/kio/pcs/pcs_cluster_core.c
@@ -120,9 +120,9 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
 		const char *cluster_name, struct pcs_cluster_core_attr *attr)
 {
 	int err;
-	/* Ignore this for now, i have cluter_id and node_id*/
-	/* if (cluster_name == NULL) */
-	/*	   return -1; */
+
+	if (!cluster_name)
+		return -EINVAL;
 
 	spin_lock_init(&cc->lock);
 	INIT_LIST_HEAD(&cc->work_queue);
@@ -131,6 +131,7 @@ int pcs_cc_init(struct pcs_cluster_core *cc, struct workqueue_struct *wq,
 	INIT_WORK(&cc->completion_job, cc_completion_handler);
 	INIT_WORK(&cc->fiemap_work, fiemap_work_func);
 	cc->wq = wq;
+	snprintf(cc->cluster_name, sizeof(cc->cluster_name), "%s", cluster_name);
 
 	pcs_csset_init(&cc->css);
 
diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index cdad11f5e70f..79b11a52c438 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -13,6 +13,7 @@
 #include "pcs_cs.h"
 #include "pcs_cs_prot.h"
 #include "pcs_cluster.h"
+#include "pcs_sock_conn.h"
 #include "pcs_ioctl.h"
 #include "log.h"
 #include "fuse_ktrace.h"
@@ -34,7 +35,8 @@ static void cs_aborting(struct pcs_rpc *ep, int error);
 static struct pcs_msg *cs_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h);
 static int cs_input(struct pcs_rpc *ep, struct pcs_msg *msg);
 static void cs_keep_waiting(struct pcs_rpc *ep, struct pcs_msg *req, struct pcs_msg *msg);
-static void cs_connect(struct pcs_rpc *ep);
+static void cs_user_connect(struct pcs_rpc *ep);
+static void cs_kernel_connect(struct pcs_rpc *ep);
 static void pcs_cs_isolate(struct pcs_cs *cs, struct list_head *dispose);
 static void pcs_cs_destroy(struct pcs_cs *cs);
 
@@ -43,7 +45,7 @@ struct pcs_rpc_ops cn_rpc_ops = {
 	.get_hdr		= cs_get_hdr,
 	.state_change		= cs_aborting,
 	.keep_waiting		= cs_keep_waiting,
-	.connect		= cs_connect,
+	.connect		= cs_kernel_connect,
 };
 
 static int pcs_cs_percpu_stat_alloc(struct pcs_cs *cs)
@@ -434,7 +436,46 @@ static void cs_get_read_response_iter(struct pcs_msg *msg, int offset, struct io
 	}
 }
 
-static void cs_connect(struct pcs_rpc *ep)
+static void cs_kernel_connect(struct pcs_rpc *ep)
+{
+	if (ep->flags & PCS_RPC_F_LOCAL) {
+		char path[128];
+
+		snprintf(path, sizeof(path)-1, PCS_SHM_DIR "/%llu_" CLUSTER_ID_FMT,
+			 (unsigned long long)ep->peer_id.val, CLUSTER_ID_ARGS(ep->eng->cluster_id));
+
+		if ((strlen(path) + 1) > sizeof(((struct sockaddr_un *) 0)->sun_path)) {
+			TRACE("Path to local socket is too long: %s", path);
+
+			ep->flags &= ~PCS_RPC_F_LOCAL;
+			goto fail;
+		}
+		memset(&ep->sh.sun, 0, sizeof(struct sockaddr_un));
+		ep->sh.sun.sun_family = AF_UNIX;
+		ep->sh.sa_len = sizeof(struct sockaddr_un);
+		strcpy(ep->sh.sun.sun_path, path);
+	} else {
+		/* TODO: print sock addr using pcs_format_netaddr() */
+		if (ep->addr.type != PCS_ADDRTYPE_RDMA) {
+			if (pcs_netaddr2sockaddr(&ep->addr, &ep->sh.sa, &ep->sh.sa_len)) {
+				TRACE("netaddr to sockaddr failed");
+				goto fail;
+			}
+		} else {
+			WARN_ON_ONCE(1);
+			/* TODO: rdma connect init */
+			goto fail;
+		}
+	}
+	ep->state = PCS_RPC_CONNECT;
+	pcs_sockconnect_start(ep); /* TODO: rewrite to use pcs_netconnect callback */
+	return;
+fail:
+	pcs_rpc_reset(ep);
+	return;
+}
+
+__maybe_unused static void cs_user_connect(struct pcs_rpc *ep)
 {
 	struct pcs_cluster_core *cc = cc_from_rpc(ep->eng);
 	struct pcs_fuse_cluster *pfc = pcs_cluster_from_cc(cc);
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 64ddd24a7525..e5f417387572 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -103,7 +103,7 @@ static void process_pcs_init_reply(struct fuse_conn *fc, struct fuse_args *args,
 		goto out;
 	}
 
-	if (pcs_cluster_init(pfc, pcs_wq, fc, &info->cluster_id, &info->node_id)) {
+	if (pcs_cluster_init(pfc, pcs_wq, fc, info)) {
 		fc->conn_error = 1;
 		kvfree(pfc);
 		goto out;
diff --git a/fs/fuse/kio/pcs/pcs_ioctl.h b/fs/fuse/kio/pcs/pcs_ioctl.h
index 31d5e92cdd93..b86d670466cc 100644
--- a/fs/fuse/kio/pcs/pcs_ioctl.h
+++ b/fs/fuse/kio/pcs/pcs_ioctl.h
@@ -10,7 +10,7 @@
 #include "pcs_map.h"
 #include "pcs_rpc.h"
 
-#define PCS_FAST_PATH_VERSION ((PCS_FAST_PATH_VERSION_T){{1, 2}})
+#define PCS_FAST_PATH_VERSION ((PCS_FAST_PATH_VERSION_T){{1, 3}})
 
 #define PCS_FUSE_INO_SPECIAL_ ((unsigned long long)-0x1000)
 
@@ -38,6 +38,7 @@ struct pcs_ioc_init_kdirect
 	PCS_NODE_ID_T node_id;
 	PCS_CLUSTER_ID_T cluster_id;
 	PCS_FAST_PATH_VERSION_T version;
+	char cluster_name[NAME_MAX];
 };
 
 struct pcs_ioc_fileinfo
diff --git a/fs/fuse/kio/pcs/pcs_req.h b/fs/fuse/kio/pcs/pcs_req.h
index f8f52641c11d..f8b4625c0499 100644
--- a/fs/fuse/kio/pcs/pcs_req.h
+++ b/fs/fuse/kio/pcs/pcs_req.h
@@ -237,6 +237,8 @@ struct pcs_cluster_core
 	int (*abort_callback)(struct pcs_cluster_core *cc, struct pcs_int_request *ireq);
 	struct fuse_conn *fc;
 	spinlock_t		lock;
+
+	char cluster_name[NAME_MAX];
 };
 
 static inline struct pcs_cluster_core *cc_from_csset(struct pcs_cs_set * css)
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index 1d1b6b9d26a1..b419915bddc1 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -245,6 +245,7 @@ void rpc_abort(struct pcs_rpc * ep, int fatal, int error)
 /* Client close. */
 void pcs_rpc_close(struct pcs_rpc * ep)
 {
+	TRACE("pcs_rpc_close");
 	mutex_lock(&ep->mutex);
 	BUG_ON(ep->flags & PCS_RPC_F_DEAD);
 	BUG_ON(ep->flags & PCS_RPC_F_PASSIVE);
@@ -356,7 +357,7 @@ void __pcs_rpc_put(struct pcs_rpc *ep)
 		queue_work(pcs_cleanup_wq, &rpc_cleanup_work);
 }
 
-static void rpc_eof_cb(struct pcs_sockio * sio)
+void rpc_eof_cb(struct pcs_sockio * sio)
 {
 	struct pcs_rpc * ep = sio->parent;
 
@@ -410,7 +411,7 @@ void pcs_rpc_error_respond(struct pcs_rpc * ep, struct pcs_msg * msg, int err)
 /* After client gets csconn_complete() callback, he makes some actions and completes switch
  * to WORK state calling this function.
  */
-static void pcs_rpc_enable(struct pcs_rpc * ep, int error)
+void pcs_rpc_enable(struct pcs_rpc * ep, int error)
 {
 	struct pcs_cluster_core *cc = cc_from_rpc(ep->eng);
 
@@ -433,6 +434,7 @@ static void pcs_rpc_enable(struct pcs_rpc * ep, int error)
 	}
 	TRACE("ep(%p)->state: WORK\n", ep);
 	ep->state = PCS_RPC_WORK;
+	ep->retries = 0;
 	queue_work(cc->wq, &ep->work);
 }
 
@@ -1408,7 +1410,6 @@ void rpc_connect_done(struct pcs_rpc *ep, struct socket *sock)
 
 	TRACE(PEER_FMT " ->state:%d sock:%p\n", PEER_ARGS(ep), ep->state, sock);
 	cancel_delayed_work(&ep->timer_work);
-	ep->retries++;
 
 	if (ep->state != PCS_RPC_CONNECT) {
 		FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR, "Invalid state: %u", ep->state);
@@ -1425,7 +1426,6 @@ void rpc_connect_done(struct pcs_rpc *ep, struct socket *sock)
 	sio->get_msg = rpc_get_hdr;
 	sio->eof = rpc_eof_cb;
 	//pcs_ioconn_register(ep->conn);
-	ep->retries = 0;
 	if (ep->gc)
 		list_lru_add(&ep->gc->lru, &ep->lru_link);
 
diff --git a/fs/fuse/kio/pcs/pcs_rpc.h b/fs/fuse/kio/pcs/pcs_rpc.h
index 119de48d4376..b0b9b42f2349 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.h
+++ b/fs/fuse/kio/pcs/pcs_rpc.h
@@ -7,6 +7,10 @@
 
 struct pcs_msg;
 
+#define PCS_PRODUCT_NAME "vstorage"
+#define PCS_DEV_SHM "/dev/shm"
+#define PCS_SHM_DIR PCS_DEV_SHM"/"PCS_PRODUCT_NAME
+
 #define PCS_RPC_HASH_SIZE	1024
 
 enum
@@ -103,6 +107,15 @@ struct pcs_rpc
 	struct sockaddr_un *	sun;
 #endif
 	struct pcs_ioconn *	conn;		/* Active connection for the peer */
+	struct {
+		int sa_len;
+		union {
+			struct sockaddr sa;
+			struct sockaddr_in saddr4;
+			struct sockaddr_in6 saddr6;
+			struct sockaddr_un sun;
+		};
+	} sh;
 
 	struct pcs_rpc_ops *	ops;
 
@@ -272,6 +285,7 @@ void pcs_rpc_init_response(struct pcs_msg * msg, struct pcs_rpc_hdr * req_hdr, i
 
 /* Allocate message and initialize header */
 struct pcs_msg * pcs_rpc_alloc_msg_w_hdr(int type, int size);
+struct pcs_msg *rpc_get_hdr(struct pcs_sockio * sio, u32 *msg_size);
 
 void pcs_rpc_set_memlimits(struct pcs_rpc_engine * eng, u64 thresh, u64 limit);
 void pcs_rpc_account_adjust(struct pcs_msg * msg, int adjustment);
@@ -286,6 +300,8 @@ void rpc_trace_health(struct pcs_rpc * ep);
 void pcs_rpc_enumerate_rpc(struct pcs_rpc_engine *eng, void (*cb)(struct pcs_rpc *ep, void *arg), void *arg);
 void pcs_rpc_set_sock(struct pcs_rpc *ep, struct pcs_sockio * sio);
 void rpc_connect_done(struct pcs_rpc *ep, struct socket *sock);
+void pcs_rpc_enable(struct pcs_rpc * ep, int error);
+void rpc_eof_cb(struct pcs_sockio *sio);
 
 static inline struct pcs_rpc *pcs_rpc_from_work(struct work_struct *wr)
 {
diff --git a/fs/fuse/kio/pcs/pcs_rpc_prot.h b/fs/fuse/kio/pcs/pcs_rpc_prot.h
index e32e7912943e..7dd33012dbe4 100644
--- a/fs/fuse/kio/pcs/pcs_rpc_prot.h
+++ b/fs/fuse/kio/pcs/pcs_rpc_prot.h
@@ -62,6 +62,27 @@ struct pcs_rpc_error_resp
 /* Application specific payload types */
 #define PCS_RPC_APP_PAYLOAD_BASE	512
 
+/* Auth req/resp flags */
+enum
+{
+	PCS_RPC_AUTH_F_VOID_SENDER	= 1,		/* sender_id is undefined */
+	PCS_RPC_AUTH_F_VOID_RECIPIENT	= 2,		/* recipient_id is undefined */
+	PCS_RPC_AUTH_F_ACQ_SENDER	= 4,		/* sender_id requests for id, recipient must be MDS
+							 * NOTE: it is not clear, should be initializatiton
+							 * part of auth phase. F.e. CS register could be
+							 * with auth phase.
+							 */
+	PCS_RPC_AUTH_F_BOOTSTRAP	= 8,		/* Not a real MDS (f.e. no quorum), can only return
+							 * advices about another MDSs location
+							 */
+	PCS_RPC_AUTH_F_SLAVE		= 16,		/* MDS is authentic, but will not accept requests changing
+							 * state.
+							 */
+	PCS_RPC_AUTH_F_VOID_CLUSTERID	= 32,		/* cluster_id is undefined */
+
+	PCS_RPC_AUTH_F_AUTHORISED	= 64,		/* client was successfully authorised on MDS */
+};
+
 /* Node role */
 enum
 {
diff --git a/fs/fuse/kio/pcs/pcs_sock_conn.c b/fs/fuse/kio/pcs/pcs_sock_conn.c
new file mode 100644
index 000000000000..bee9972dec17
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_sock_conn.c
@@ -0,0 +1,672 @@
+#include <net/sock.h>
+#include <linux/module.h>
+#include <linux/types.h>
+#include <linux/tcp.h>
+
+#include <crypto/hash.h>
+#include <crypto/md5.h>
+
+#include "pcs_types.h"
+#include "pcs_sock_io.h"
+#include "pcs_rpc.h"
+#include "pcs_cluster.h"
+#include "log.h"
+#include "fuse_ktrace.h"
+
+#define PCS_CFG_DIR		"/etc/vstorage"
+#define AUTH_DIGEST_NAME	"digest"
+#define AUTH_DIGEST_NAME_LEN	(sizeof(AUTH_DIGEST_NAME) - 1)
+#define PCS_KEYPATH_FMT		(PCS_CFG_DIR"/clusters/%s/auth_digest.key")
+#define LOCK_FILE_PATH_FMT	(PCS_CFG_DIR"/clusters/%s/.digest_auth.lock")
+
+enum
+{
+	PCS_AUTH_INITIAL = 0,	/* Basic states of auth handshake required for establish SSL connection,
+				 * other authentication protocols can require more states
+				 */
+	PCS_AUTH_SEND_HELLO,	/* Client sends hello at connect */
+	PCS_AUTH_SEND_SRV_CERT,	/* Server sends its cert. to client */
+	PCS_AUTH_SEND_CN_CERT,	/* Client sends its cert. to server */
+	PCS_AUTH_SRV_ACCEPT,	/* Server accept client's cert. */
+};
+
+#define DIGEST_AUTH_ID_LEN 32
+#define DIGEST_SALT_LEN 16
+#define DIGEST_KEY_LEN 32
+
+__pre_packed struct digest_hello_msg {
+	unsigned char md5_cn[MD5_DIGEST_SIZE];
+} __packed;
+
+__pre_packed struct digest_srv_salt_msg {
+	unsigned char salt[DIGEST_SALT_LEN];
+	unsigned char id[DIGEST_AUTH_ID_LEN];
+} __packed;
+
+#define HMAC_SHA512_HSIZE 64U
+
+__pre_packed struct digest_hmac_msg {
+	unsigned int size;
+	unsigned char data[HMAC_SHA512_HSIZE];
+} __packed;
+
+__pre_packed struct digest_msg {
+	unsigned char id[DIGEST_AUTH_ID_LEN];
+	struct digest_hmac_msg hmac;
+} __packed;
+
+static int pcs_generate_hmac(u8 *key, size_t key_sz, u8 *in, size_t in_sz,
+			     u8 *out, u32 *out_sz)
+{
+	struct crypto_shash *hmacalg;
+	struct shash_desc *shash;
+	int ret;
+
+	hmacalg = crypto_alloc_shash("hmac(sha1)", 0, 0);
+	if (IS_ERR(hmacalg)) {
+		TRACE("hmacalg: could not allocate crypto %ld", PTR_ERR(hmacalg));
+		return PTR_ERR(hmacalg);
+	}
+
+	ret = crypto_shash_setkey(hmacalg, key, key_sz);
+	if (ret) {
+		TRACE("crypto_shash_setkey failed: err %d", ret);
+		goto fail1;
+	}
+
+	shash = kzalloc(sizeof(*shash) + crypto_shash_descsize(hmacalg),
+			GFP_KERNEL);
+	if (!shash) {
+		ret = -ENOMEM;
+		goto fail1;
+	}
+
+	shash->tfm = hmacalg;
+	shash->flags = CRYPTO_TFM_REQ_MAY_SLEEP;
+
+	ret = crypto_shash_digest(shash, in, in_sz, out);
+	if (ret)
+		TRACE("crypto_shash_digest failed: %d", ret);
+
+	*out_sz = crypto_shash_alg(shash->tfm)->digestsize;
+	kfree(shash);
+fail1:
+	crypto_free_shash(hmacalg);
+	return ret;
+}
+
+static int pcs_validate_hmac(struct digest_msg *digest, u8 *key, size_t key_sz,
+			     u8 *data, u32 data_sz)
+{
+	u8 hmac[HMAC_SHA512_HSIZE];
+	int err;
+
+	err = pcs_generate_hmac(key, key_sz, digest->id, sizeof(digest->id),
+				hmac, &data_sz);
+	if (err)
+		return err;
+
+	return !memcmp(hmac, data, min(data_sz, HMAC_SHA512_HSIZE));
+}
+
+static int pcs_md5_hash(char *result, char *data, size_t len)
+{
+	struct shash_desc *desc;
+	int err;
+
+	desc = kmalloc(sizeof(*desc), GFP_KERNEL);
+	if (!desc)
+		return -ENOMEM;
+
+	desc->tfm = crypto_alloc_shash("md5", 0, CRYPTO_ALG_ASYNC);
+	if(IS_ERR(desc->tfm)) {
+		err = PTR_ERR(desc->tfm);
+		goto fail1;
+	}
+
+	err = crypto_shash_init(desc);
+	if (err)
+		goto fail2;
+	err = crypto_shash_update(desc, data, len);
+	if (err)
+		goto fail2;
+	err = crypto_shash_final(desc, result);
+fail2:
+	crypto_free_shash(desc->tfm);
+fail1:
+	kfree(desc);
+
+	return err;
+}
+
+static struct file *lock_key_file(char *cluster_name)
+{
+	char lockfile[sizeof(LOCK_FILE_PATH_FMT) + NAME_MAX];
+	struct file_lock *lock;
+	struct file *f;
+	int err;
+
+	snprintf(lockfile, sizeof(lockfile) - 1, LOCK_FILE_PATH_FMT,
+		 cluster_name);
+	f = filp_open(lockfile, O_CREAT | O_RDONLY | O_CLOEXEC,
+		      S_IRUSR | S_IRGRP | S_IROTH);
+	if (IS_ERR(f))
+		return f;
+
+	lock = locks_alloc_lock();
+	if (!lock) {
+		filp_close(f, NULL);
+		return ERR_PTR(-ENOMEM);
+	}
+	lock->fl_file = f;
+	lock->fl_pid = current->tgid;
+	lock->fl_flags = FL_FLOCK;
+	lock->fl_type = F_WRLCK;
+	lock->fl_end = OFFSET_MAX;
+
+	err = locks_lock_file_wait(f, lock);
+	if (err < 0) {
+		filp_close(f, NULL);
+		return ERR_PTR(err);
+	}
+	return f;
+}
+
+static int pcs_load_keyfile_auth(char *cluster_name, u8 *key_out, u32 len)
+{
+	char keyfile[sizeof(PCS_KEYPATH_FMT) + NAME_MAX];
+	struct file *f, *flock;
+	u64 offs = 0;
+	int err;
+
+	flock = lock_key_file(cluster_name);
+	if (IS_ERR(flock)) {
+		TRACE("Lock keyfile failed: %ld", PTR_ERR(flock));
+		return PTR_ERR(flock);
+	}
+
+	snprintf(keyfile, sizeof(keyfile) - 1, PCS_KEYPATH_FMT, cluster_name);
+
+	f = filp_open(keyfile, O_RDONLY, 0);
+	if (IS_ERR(f)) {
+		err = PTR_ERR(f);
+		TRACE("Can't open keyfile auth: %s, err: %d", keyfile, err);
+		goto out;
+	}
+
+	err = kernel_read(f, key_out, len, &offs);
+	if (err < 0) {
+		TRACE("Can't read keyfile: %s, err: %d", keyfile, err);
+	} else if (err != len)
+		TRACE("Can't read full key(req: %d, read: %d)", len, err);
+	filp_close(f, NULL);
+out:
+	filp_close(flock, NULL);
+
+	return err < 0 ? err : 0;
+}
+
+static inline void pcs_sock_keepalive(struct socket *sock)
+{
+	int val;
+
+	val = 1;
+	kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+			  (char *)&val, sizeof(val));
+	val = 60;
+	kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
+			  (char *)&val, sizeof(val));
+	val = 5;
+	kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
+			  (char *)&val, sizeof(val));
+	val = 5;
+	kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
+			  (char *)&val, sizeof(val));
+}
+
+static inline int pcs_sock_cork(struct socket *sock)
+{
+	int val = 1;
+	if (kernel_setsockopt(sock, SOL_TCP, TCP_CORK, (char *)&val,
+			      sizeof(val)) == 0)
+		return 0;
+	return -1;
+}
+
+static inline void pcs_sock_nodelay(struct socket *sock)
+{
+	int val = 1;
+	kernel_setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&val,
+			  sizeof(val));
+}
+
+static int send_buf(struct socket *sock, u8 *buf, size_t size)
+{
+	struct msghdr msg = {
+		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL,
+	};
+	struct kvec iov = {
+		.iov_base = buf,
+		.iov_len = size,
+	};
+	int ret = kernel_sendmsg(sock, &msg, &iov, 1, size);
+	return ret < 0 ? ret : 0;
+}
+
+static int recv_buf(struct socket *sock, u8 *buf, size_t size)
+{
+	struct msghdr msg = {
+		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL,
+	};
+	struct kvec iov = {
+		.iov_base = buf,
+		.iov_len = size,
+	};
+	int ret = kernel_recvmsg(sock, &msg, &iov, 1, size,
+				 MSG_WAITALL | MSG_NOSIGNAL);
+	if (ret < 0)
+		return ret;
+	return ret != size ? -EPROTO : 0;
+}
+
+#define __str_len(s) (ARRAY_SIZE(s) - sizeof(*(s)))
+
+/* Multiple payloads are supported. They are expected to have fixed alignment in between. */
+#define PCS_RPC_AUTH_PAYLOAD_ALIGN 8
+
+#define PCS_BUILD_VERSION "unknown"
+#define MAX_BUILD_VERSION_LENGTH 30
+
+static struct {
+	struct pcs_rpc_payload p;
+	char build_version[MAX_BUILD_VERSION_LENGTH+1];
+} s_version_data = {
+	{
+		.len = __str_len(PCS_BUILD_VERSION),
+		.type = PCS_RPC_BUILD_VERSION_PAYLOAD,
+	},
+	.build_version = PCS_BUILD_VERSION
+};
+
+static inline unsigned rpc_auth_payload_size(struct pcs_rpc_payload const* p) {
+	return sizeof(*p) + p->len;
+}
+
+static inline unsigned rpc_auth_payload_size_aligned(struct pcs_rpc_payload const* p) {
+	return round_up(rpc_auth_payload_size(p), PCS_RPC_AUTH_PAYLOAD_ALIGN);
+}
+
+static inline struct pcs_rpc_payload* rpc_auth_payload_next(struct pcs_rpc_payload* p) {
+	return (void*)p + rpc_auth_payload_size_aligned(p);
+}
+
+#define PCS_RPC_DIGEST_PAYLOAD 13
+
+struct pcs_rpc_auth
+{
+	struct pcs_rpc_hdr hdr;
+
+	PCS_CLUSTER_ID_T cluster_id;	/* Cluster identity */
+	PCS_NODE_ID_T sender_id;	/* Identity of sender */
+	PCS_NODE_ID_T recipient_id;	/* Expected identity of recipient */
+	u8 sender_role;			/* Role of sender (TEST/CN/CS/MDS) */
+	u8 recipient_role;		/* Expected role of recipient */
+	u8 flags;			/* Flags */
+	u8 state;			/* State of auth handshake */
+	u32 version;			/* Protocol version */
+	struct pcs_host_info host;
+	u32 reserved[3];
+	u32 npayloads;
+	struct pcs_rpc_payload payload;
+} __attribute__((aligned(8)));
+
+
+#define PCS_RPC_AUTH_REQ 8
+#define PCS_RPC_AUTH_RESP (PCS_RPC_AUTH_REQ | PCS_RPC_DIRECTION)
+
+static int send_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
+{
+	struct pcs_rpc_engine *eng = ep->eng;
+	struct pcs_rpc_auth *au;
+	size_t msg_sz = sizeof(struct pcs_rpc_auth) +
+			round_up(size, PCS_RPC_AUTH_PAYLOAD_ALIGN) +
+			rpc_auth_payload_size_aligned(&s_version_data.p);
+	struct pcs_msg *msg;
+	int err;
+
+	msg = pcs_rpc_alloc_output_msg(msg_sz);
+	if (!msg) {
+		TRACE("Can't alloc auth msg");
+		return -ENOMEM;
+	}
+
+	au = (struct pcs_rpc_auth *)msg->_inline_buffer;
+	*au = (struct pcs_rpc_auth) {
+		.hdr.type = PCS_RPC_AUTH_REQ,
+		.hdr.len = msg_sz,
+		.cluster_id = eng->cluster_id,
+		.sender_id = eng->local_id,
+		.recipient_id = ep->peer_id,
+		.recipient_role = ep->peer_role,
+		.version = PCS_VERSION_CURRENT,
+		.state = state,
+		.host = eng->my_host,
+		.npayloads = 2,
+	};
+	pcs_rpc_get_new_xid(eng, &au->hdr.xid);
+
+	if (size) {
+		au->payload.type = PCS_RPC_DIGEST_PAYLOAD;
+		au->payload.len = size;
+		memcpy(au + 1, data, size);
+	}
+	memcpy(rpc_auth_payload_next(&au->payload), &s_version_data,
+	       rpc_auth_payload_size(&s_version_data.p));
+
+	if (!(ep->flags & PCS_RPC_F_PEER_ID))
+		au->flags |= PCS_RPC_AUTH_F_VOID_RECIPIENT;
+	if (!(eng->flags & PCS_KNOWN_MYID)) {
+		au->flags |= PCS_RPC_AUTH_F_VOID_SENDER;
+		if (ep->flags & PCS_RPC_F_ACQ_ID)
+			au->flags |= PCS_RPC_AUTH_F_ACQ_SENDER;
+	}
+
+	if (!(eng->flags & PCS_KNOWN_CLUSTERID))
+		au->flags |= PCS_RPC_AUTH_F_VOID_CLUSTERID;
+
+	TRACE("state=%d, type=%d, len=%d, msg_sz: %lu",
+	      au->state, au->payload.type, au->payload.len, msg_sz);
+
+	err = send_buf(ep->conn->socket, (u8*)au, msg_sz);
+	if (err)
+		TRACE("Can't send au msg, err: %d", err);
+	pcs_free_msg(msg);
+
+	return err;
+}
+
+static int recv_auth_msg(struct pcs_rpc *ep, void *data, size_t size, int state)
+{
+	struct pcs_rpc_auth *au;
+	size_t fixed_sz = sizeof(struct pcs_rpc_auth) +
+			  round_up(size, PCS_RPC_AUTH_PAYLOAD_ALIGN);
+	struct pcs_msg *msg;
+	int err;
+
+	msg = pcs_rpc_alloc_output_msg(fixed_sz);
+	if (!msg) {
+		TRACE("Can't alloc auth msg");
+		return -ENOMEM;
+	}
+
+	err = recv_buf(ep->conn->socket, msg->_inline_buffer, fixed_sz);
+	if (err) {
+		TRACE("Can't recv auth msg(%d), err: %lu", err, fixed_sz);
+		goto fail;
+	}
+	au = (struct pcs_rpc_auth *)msg->_inline_buffer;
+
+	/* Fatal stream format error */
+	if (au->hdr.len < sizeof(au->hdr) || au->hdr.len > ep->params.max_msg_size) {
+		TRACE("Bad message header %u %u\n", au->hdr.len, au->hdr.type);
+		err = -EPROTO;
+		goto fail;
+	}
+	WARN_ON_ONCE(au->hdr.type != PCS_RPC_AUTH_RESP &&
+		     au->hdr.type != PCS_RPC_ERROR_RESP);
+
+	TRACE("state=%d, payloads:=%u, type=%d, len=%d", au->state,
+	      au->npayloads, au->payload.type, au->payload.len);
+	if (au->state != state) {
+		TRACE("Unexpected state %d, should be %d", au->state, state);
+		err = -EPROTO;
+		goto fail;
+	}
+	if (au->flags & PCS_RPC_AUTH_F_VOID_CLUSTERID)
+		TRACE("Wrong: auth void cluster");
+
+	WARN_ON_ONCE(au->npayloads != 2);
+	if (au->payload.len != size) {
+		TRACE("Wrong auth payload %u %u, data_sz: %lu\n",
+		       au->payload.len, au->payload.type, size);
+		err = -EPROTO;
+		goto fail;
+	}
+	WARN_ON_ONCE(au->payload.type != PCS_RPC_DIGEST_PAYLOAD);
+	memcpy(data, &au->payload + 1, size);
+
+	/* Load rest of the message if needed */
+	if (au->hdr.len > fixed_sz) {
+		size_t rest_sz = au->hdr.len - fixed_sz;
+		while (rest_sz) {
+			size_t recv_sz = min(fixed_sz, rest_sz);
+			err = recv_buf(ep->conn->socket, msg->_inline_buffer,
+				       recv_sz);
+			if (err) {
+				TRACE("Can't recv auth msg(%d), err: %lu",
+				      err, recv_sz);
+				goto fail;
+			}
+			rest_sz -= recv_sz;
+		}
+	}
+
+fail:
+	pcs_free_msg(msg);
+	return err;
+}
+
+static int pcs_do_auth_digest(struct pcs_rpc *ep)
+{
+	struct {
+		u8 key[DIGEST_KEY_LEN];
+		u8 salt[DIGEST_SALT_LEN];
+	} auth_cfg;
+	struct digest_hello_msg hi;
+	struct digest_srv_salt_msg slt;
+	struct digest_msg digest;
+	struct digest_hmac_msg hmac;
+	char *cluster_name = cc_from_rpc(ep->eng)->cluster_name;
+	int err;
+
+	err = pcs_load_keyfile_auth(cluster_name, (u8*)&auth_cfg, sizeof(auth_cfg));
+	if (err)
+		return err;
+
+	err = pcs_md5_hash(hi.md5_cn, cluster_name, strlen(cluster_name));
+	if (err) {
+		TRACE("Can't calculate md5 from cluster name, err: %d", err);
+		return err;
+	}
+
+	err = send_auth_msg(ep, &hi, sizeof(hi), PCS_AUTH_SEND_HELLO);
+	if (err) {
+		TRACE("Can't send hello auth msg, err: %d", err);
+		return err;
+	}
+
+	err = recv_auth_msg(ep, &slt, sizeof(slt), PCS_AUTH_SEND_SRV_CERT);
+	if (err) {
+		TRACE("Can't receive salt auth msg, err: %d", err);
+		return err;
+	}
+
+	if (memcmp(slt.salt, auth_cfg.salt, sizeof(auth_cfg.salt))) {
+		TRACE("Server use different salt");
+		return -EPROTO;
+	}
+
+	get_random_bytes(digest.id, sizeof(digest.id));
+	digest.hmac.size = sizeof(digest.hmac.data);
+	err = pcs_generate_hmac(auth_cfg.key, sizeof(auth_cfg.key), slt.id,
+				sizeof(slt.id), digest.hmac.data,
+				&digest.hmac.size);
+	if (err) {
+		TRACE("HMAC generate fail %d", err);
+		return err;
+	}
+
+	err = send_auth_msg(ep, &digest, sizeof(digest), PCS_AUTH_SEND_CN_CERT);
+	if (err) {
+		TRACE("Can't send digest msg, err: %d", err);
+		return err;
+	}
+
+	err = recv_auth_msg(ep, &hmac, sizeof(hmac), PCS_AUTH_SRV_ACCEPT);
+	if (err) {
+		TRACE("Can't receive hmac auth msg, err: %d", err);
+		return err;
+	}
+
+	if (!pcs_validate_hmac(&digest, auth_cfg.key, sizeof(auth_cfg.key),
+			       hmac.data, hmac.size)) {
+		TRACE("Received bad digest");
+		return -EPROTO;
+	}
+
+	err = send_auth_msg(ep, NULL, 0, PCS_AUTH_SRV_ACCEPT + 1);
+	if (err)
+		TRACE("Can't send auth srv accept msg, err: %d", err);
+
+	return err;
+}
+
+enum {
+	PCS_AUTH_DIGEST = 0,
+};
+
+static int rpc_client_start_auth(struct pcs_rpc *ep, int auth_type)
+{
+	switch (auth_type) {
+		case PCS_AUTH_DIGEST:
+			return pcs_do_auth_digest(ep);
+		default:
+			BUG();
+	}
+	return -EOPNOTSUPP;
+}
+
+int pcs_netaddr2sockaddr(PCS_NET_ADDR_T const* addr, struct sockaddr *sa, int *salen)
+{
+	BUG_ON(!sa);
+	if (addr->type == PCS_ADDRTYPE_IP || addr->type == PCS_ADDRTYPE_RDMA) {
+		struct sockaddr_in *saddr4 = (struct sockaddr_in *)sa;
+		*saddr4 = (struct sockaddr_in) {
+			.sin_family = AF_INET,
+			.sin_port = (u16)addr->port,
+		};
+		memcpy(&saddr4->sin_addr, addr->address, sizeof(saddr4->sin_addr));
+		*salen = sizeof(*saddr4);
+	} else if (addr->type == PCS_ADDRTYPE_IP6) {
+		struct sockaddr_in6 *saddr6 = (struct sockaddr_in6 *)sa;
+		*saddr6 = (struct sockaddr_in6) {
+			.sin6_family = AF_INET6,
+			.sin6_port = (u16)addr->port,
+		};
+		memcpy(&saddr6->sin6_addr, addr->address, sizeof(saddr6->sin6_addr));
+		*salen = sizeof(*saddr6);
+	} else
+		return -EINVAL;
+
+	return 0;
+}
+
+void pcs_sockconnect_start(struct pcs_rpc *ep)
+{
+	struct pcs_sockio *sio;
+	struct sockaddr *sa = &ep->sh.sa;
+	struct socket *sock;
+	int err, alloc_max = ep->params.alloc_hdr_size;
+
+	BUG_ON(!mutex_is_locked(&ep->mutex));
+
+	sio = kzalloc(sizeof(struct pcs_sockio) + alloc_max, GFP_NOIO);
+	if (!sio) {
+		TRACE("Can't allocate sio\n");
+		goto fail;
+	}
+
+	INIT_LIST_HEAD(&sio->write_queue);
+	iov_iter_kvec(&sio->read_iter, READ, NULL, 0, 0);
+	iov_iter_kvec(&sio->write_iter, WRITE, NULL, 0, 0);
+	sio->hdr_max = sizeof(struct pcs_rpc_hdr);
+	sio->flags = sa->sa_family != AF_UNIX ? PCS_SOCK_F_CORK : 0;
+	INIT_LIST_HEAD(&sio->ioconn.list);
+
+	err = sock_create(sa->sa_family, SOCK_STREAM, 0, &sock);
+	if (err < 0) {
+		TRACE("Can't create socket: %d\n", err);
+		goto fail2;
+	}
+	pcs_clear_error(&sio->error);
+
+	err = sock->ops->connect(sock, sa, ep->sh.sa_len, O_NONBLOCK);
+	if (err != 0 && err != -EINPROGRESS) {
+		TRACE("Failed connection: %d\n", err);
+		sock_release(sock);
+		goto fail2;
+	}
+	pcs_sock_keepalive(sock);
+	if (!pcs_sock_cork(sock))
+		sio->flags |= PCS_SOCK_F_CORK;
+	else
+		pcs_sock_nodelay(sock);
+
+	TRACE(PEER_FMT " ->state:%d sock:%p\n", PEER_ARGS(ep), ep->state, sock);
+	cancel_delayed_work(&ep->timer_work);
+	ep->retries++;
+
+	ep->conn = &sio->ioconn;
+	sio->parent = pcs_rpc_get(ep);
+	sio->get_msg = rpc_get_hdr;
+	sio->eof = rpc_eof_cb;
+	sio->send_timeout = PCS_SIO_TIMEOUT;
+	sio->ioconn.socket = sock;
+	sio->ioconn.destruct = pcs_sock_internal_ioconn_destruct;
+	if (ep->gc)
+		list_lru_add(&ep->gc->lru, &ep->lru_link);
+
+	if (ep->flags & PCS_RPC_F_CLNT_PEER_ID)
+		ep->flags |= PCS_RPC_F_PEER_ID;
+
+	ep->state = PCS_RPC_AUTH;
+	err = rpc_client_start_auth(ep, PCS_AUTH_DIGEST);
+	if (err < 0) {
+		FUSE_KLOG(cc_from_rpc(ep->eng)->fc, LOG_ERR,
+			  "Authorization failed: %d", err);
+		goto fail; /* since ep->conn is initialized,
+			    * sio will be freed in pcs_rpc_reset()
+			    */
+	}
+	write_lock_bh(&sock->sk->sk_callback_lock);
+	/*
+	 * Backup original callbaks.
+	 * TCP and unix sockets do not have sk_user_data set.
+	 * So we avoid taking sk_callback_lock in callbacks,
+	 * since this seems to be able to result in performance.
+	 */
+	WARN_ON_ONCE(sock->sk->sk_user_data);
+	sio->ioconn.orig.user_data = sock->sk->sk_user_data;
+	sio->ioconn.orig.data_ready = sock->sk->sk_data_ready;
+	sio->ioconn.orig.write_space = sock->sk->sk_write_space;
+	sio->ioconn.orig.error_report = sock->sk->sk_error_report;
+
+	sock->sk->sk_sndtimeo = PCS_SIO_TIMEOUT;
+	sock->sk->sk_allocation = GFP_NOFS;
+
+	rcu_assign_sk_user_data(sock->sk, sio);
+	smp_wmb(); /* Pairs with smp_rmb() in callbacks */
+	sock->sk->sk_data_ready = pcs_sk_data_ready;
+	sock->sk->sk_write_space = pcs_sk_write_space;
+	sock->sk->sk_error_report = pcs_sk_error_report;
+	write_unlock_bh(&sock->sk->sk_callback_lock);
+
+	ep->state = PCS_RPC_APPWAIT;
+	pcs_rpc_enable(ep, 0);
+	return;
+fail2:
+	kfree(sio);
+fail:
+	pcs_rpc_reset(ep);
+	return;
+}
diff --git a/fs/fuse/kio/pcs/pcs_sock_conn.h b/fs/fuse/kio/pcs/pcs_sock_conn.h
new file mode 100644
index 000000000000..2353177eec63
--- /dev/null
+++ b/fs/fuse/kio/pcs/pcs_sock_conn.h
@@ -0,0 +1,7 @@
+#ifndef _PCS_SOCK_CONN_H_
+#define _PCS_SOCK_CONN_H_ 1
+
+void pcs_sockconnect_start(struct pcs_rpc *ep);
+int pcs_netaddr2sockaddr(PCS_NET_ADDR_T const* addr, struct sockaddr *sa, int *salen);
+
+#endif /* _PCS_SOCK_CONN_H_ */
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
index 31ea54260a64..80b32415664a 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.c
+++ b/fs/fuse/kio/pcs/pcs_sock_io.c
@@ -41,7 +41,6 @@ void sio_push(struct pcs_sockio * sio)
 //// caseA: userspace close socket and wait for kernelspace
 //// caseB: kernelspace want to close socket and have to somehow
 ////	    notify about this to userspace (NEW API REQUIRED)
-static void pcs_restore_sockets(struct pcs_ioconn *ioconn);
 static void pcs_ioconn_unregister(struct pcs_ioconn *ioconn)
 {
 	if (!test_bit(PCS_IOCONN_BF_DEAD, &ioconn->flags))
@@ -476,19 +475,18 @@ int pcs_sock_queuelen(struct pcs_sockio * sio)
 	return sio->write_queue_len;
 }
 
-static void pcs_restore_sockets(struct pcs_ioconn *ioconn)
+void pcs_restore_sockets(struct pcs_ioconn *ioconn)
 {
-
-	struct sock *sk;
-
-	sk = ioconn->socket->sk;
+	struct sock *sk = ioconn->socket->sk;
 
 	write_lock_bh(&sk->sk_callback_lock);
-	rcu_assign_sk_user_data(sk, ioconn->orig.user_data);
-	sk->sk_data_ready =   ioconn->orig.data_ready;
-	sk->sk_write_space =  ioconn->orig.write_space;
-	sk->sk_error_report = ioconn->orig.error_report;
-	//sock->sk->sk_state_change = pcs_state_chage;
+	if (sk->sk_user_data) {
+		rcu_assign_sk_user_data(sk, ioconn->orig.user_data);
+		sk->sk_data_ready = ioconn->orig.data_ready;
+		sk->sk_write_space = ioconn->orig.write_space;
+		sk->sk_error_report = ioconn->orig.error_report;
+		//sock->sk->sk_state_change = pcs_state_chage;
+	}
 	write_unlock_bh(&sk->sk_callback_lock);
 
 	sk->sk_sndtimeo = MAX_SCHEDULE_TIMEOUT;
@@ -505,17 +503,22 @@ static void sio_destroy_rcu(struct rcu_head *head)
 	kfree(sio);
 }
 
-void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn)
+static void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn, bool internal)
 {
 	struct pcs_sockio * sio = sio_from_ioconn(ioconn);
 
+	TRACE("Sock destruct_cb, sio: %p, internal: %d", sio, internal);
+
 	BUG_ON(sio->current_msg);
 	BUG_ON(!list_empty(&sio->write_queue));
 	BUG_ON(sio->write_queue_len);
 
 	if (ioconn->socket) {
 		pcs_restore_sockets(ioconn);
-		fput(ioconn->socket->file);
+		if (internal)
+			sock_release(ioconn->socket);
+		else
+			fput(ioconn->socket->file);
 		ioconn->socket = NULL;
 	}
 
@@ -523,6 +526,16 @@ void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn)
 	call_rcu(&sio->rcu, sio_destroy_rcu);
 }
 
+void pcs_sock_internal_ioconn_destruct(struct pcs_ioconn *ioconn)
+{
+	pcs_sock_ioconn_destruct(ioconn, true);
+}
+
+void pcs_sock_external_ioconn_destruct(struct pcs_ioconn *ioconn)
+{
+	pcs_sock_ioconn_destruct(ioconn, false);
+}
+
 static void pcs_sk_kick_queue(struct sock *sk)
 {
 	struct pcs_sockio *sio;
@@ -539,17 +552,17 @@ static void pcs_sk_kick_queue(struct sock *sk)
 	rcu_read_unlock();
 }
 
-static void pcs_sk_data_ready(struct sock *sk)
+void pcs_sk_data_ready(struct sock *sk)
 {
 	pcs_sk_kick_queue(sk);
 }
-static void pcs_sk_write_space(struct sock *sk)
+void pcs_sk_write_space(struct sock *sk)
 {
 	pcs_sk_kick_queue(sk);
 }
 
 /* TODO this call back does not look correct, sane locking/error handling is required */
-static void pcs_sk_error_report(struct sock *sk)
+void pcs_sk_error_report(struct sock *sk)
 {
 	struct pcs_sockio *sio;
 
@@ -611,7 +624,7 @@ struct pcs_sockio * pcs_sockio_init(struct socket *sock,
 	sk->sk_allocation = GFP_NOFS;
 	sio->send_timeout = PCS_SIO_TIMEOUT;
 	sio->ioconn.socket = sock;
-	sio->ioconn.destruct = pcs_sock_ioconn_destruct;
+	sio->ioconn.destruct = pcs_sock_external_ioconn_destruct;
 
 	rcu_assign_sk_user_data(sk, sio);
 	smp_wmb(); /* Pairs with smp_rmb() in callbacks */
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
index 112f7a060132..c0bedbd0754d 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.h
+++ b/fs/fuse/kio/pcs/pcs_sock_io.h
@@ -2,6 +2,9 @@
 #define _PCS_SOCK_IO_H_ 1
 
 #include <linux/net.h>
+#include <linux/in.h>
+#include <linux/in6.h>
+#include <linux/un.h>
 
 #include "pcs_types.h"
 ////#include "pcs_process.h"
@@ -156,6 +159,10 @@ int pcs_sock_queuelen(struct pcs_sockio * sio);
 void pcs_sock_abort(struct pcs_sockio * sio);
 void pcs_sock_error(struct pcs_sockio * sio, int error);
 
+void pcs_sk_data_ready(struct sock *sk);
+void pcs_sk_write_space(struct sock *sk);
+void pcs_sk_error_report(struct sock *sk);
+
 void pcs_sock_throttle(struct pcs_sockio * sio);
 void pcs_sock_unthrottle(struct pcs_sockio * sio);
 
@@ -183,6 +190,8 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, struct kvec *vec)
 	iov_iter_for_each_range(i, iov_iter_single_seg_count(i),
 			iov_iter_get_kvec_callback, vec);
 }
+void pcs_sock_internal_ioconn_destruct(struct pcs_ioconn *ioconn);
+void pcs_sock_external_ioconn_destruct(struct pcs_ioconn *ioconn);
 
 static inline void * msg_inline_head(struct pcs_msg * msg)
 {
diff --git a/fs/fuse/kio/pcs/pcs_types.h b/fs/fuse/kio/pcs/pcs_types.h
index 1170475c2226..1915fd047ab5 100644
--- a/fs/fuse/kio/pcs/pcs_types.h
+++ b/fs/fuse/kio/pcs/pcs_types.h
@@ -27,6 +27,7 @@ enum
 	PCS_ADDRTYPE_IP6 = 2,
 	PCS_ADDRTYPE_UNIX = 3,
 	PCS_ADDRTYPE_RDMA = 4,
+	PCS_ADDRTYPE_NETLINK = 5,
 };
 
 /* alignment makes it usable in binary protocols */


More information about the Devel mailing list