[CRIU] [PATCH 2/2] p.haul: use ssh tunneling and controll it with ssh* cmdline opts

Ruslan Kuprieiev kupruser at gmail.com
Mon Oct 27 05:56:27 PDT 2014


Currently we have such security holes:

1) All p.haul traffic goes without any encryption, which is
not safe at all, cosidering that attacker can easily peek memory pages
of migrating process. Lets solve that by using ssh tunnel which allows
us to easily encrypt and compress traffic.
Compressing is useful only when connection is very slow, but will only
slow down things on fast networks.
Using ssh tunnel also allows us to solve keys\certificates management
problem in a very common way that is familiar to any system administrator.

2) p.haul-service binds to 0.0.0.0 and is accesible from the outside
for anyone who is trying to connect to it. So attacket can easily connect
to p.haul-service and migrate some malicious process to the server.
Lets fix that by binding p.haul-service to 127.0.0.1 so it is accessible
only from its localhost.

So, basically, we perform following actions when migrating process
from src to dest:
1. (user at dest) Start p.haul-service on localhost:12345
2. (user at src)  Create ssh tunnel:
	ssh -NC 54321:localhost:12345 remote_ip
3. (user at src)  Connect p.haul to localhost:54321 and migrate

Conflicts:
	xem_rpc.py

Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
---
 p.haul          | 16 ++++++++++++++--
 p.haul-service  |  8 +++++++-
 p_haul_iters.py |  6 +++---
 ssh_tunnel.py   | 26 ++++++++++++++++++++++++++
 util.py         | 10 ++++++++++
 xem_rpc.py      | 37 ++++++++++++++++++++++++++-----------
 6 files changed, 86 insertions(+), 17 deletions(-)
 create mode 100644 ssh_tunnel.py

diff --git a/p.haul b/p.haul
index b8f050e..f71a59d 100755
--- a/p.haul
+++ b/p.haul
@@ -4,6 +4,11 @@ import argparse
 import p_haul_iters as ph_iters
 import images
 import criu_api
+import xem_rpc
+import ssh_tunnel
+import util
+import os
+import pwd
 
 # Usage idea
 # p.haul <type> <id> <destination>
@@ -28,12 +33,19 @@ parser.add_argument("--img-path", help = "Dirctory where to put images", default
 parser.add_argument("--pid-root", help = "Path to tree's FS root")
 parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks", default = False, action = 'store_true')
 
+ssh = parser.add_argument_group("ssh tunnel", "CMD to establish ssh tunnel: " + ssh_tunnel.ssh_tunnel_cmd)
+ssh.add_argument("--ssh-comp", help = "Use compression in the ssh tunnel", default = "", action = "store_const", const = "C")
+ssh.add_argument("--ssh-user", help = "Remote username", type = str, default = pwd.getpwuid(os.getuid()).pw_name)
+ssh.add_argument("--ssh-port", help = "Port to connect to ssh on the remote host", default = ssh_tunnel.def_ssh_port, type = int)
+ssh.add_argument("--ssh-loc-port", help = "Local port to use for forwarding", default = ssh_tunnel.def_loc_port, type = int)
+ssh.add_argument("--ssh-rem-port", help = "Remote port to use for forwarding (p.haul-service port)", default = xem_rpc.default_rpc_port, type = int)
+
 args = vars(parser.parse_args())
 
 ph_type = (args.pop("type"), args.pop("id"))
-dst = args.pop("to")
+dst_opts, args = util.pop_dest_opts(args)
 
 # Start the migration
-worker = ph_iters.phaul_iter_worker(ph_type, dst)
+worker = ph_iters.phaul_iter_worker(ph_type, dst_opts)
 worker.set_options(args)
 worker.start_migration()
diff --git a/p.haul-service b/p.haul-service
index 7444df5..46d917a 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -3,15 +3,21 @@
 import signal
 import xem_rpc
 import p_haul_service as ph_srv
+import argparse
 
 if __name__ == "__main__":
+	parser = argparse.ArgumentParser("Process HAULer service server")
+	parser.add_argument("-p", "--port", help = "port to bind on", default = xem_rpc.default_rpc_port, type = int)
+
+	args = vars(parser.parse_args())
+
 	sfd = None
 	def fin(foo, bar):
 		print "Stop by %d" % foo
 		sfd.close()
 
 	print "Starting p.haul rpyc service"
-	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service)
+	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, args["port"])
 
 	# FIXME: Setup stop handlers
 	sfd = t.get_stop_fd()
diff --git a/p_haul_iters.py b/p_haul_iters.py
index 0463e78..10dfb3f 100644
--- a/p_haul_iters.py
+++ b/p_haul_iters.py
@@ -21,13 +21,13 @@ phaul_iter_min_size = 64
 phaul_iter_grow_max = 10
 
 class phaul_iter_worker:
-	def __init__(self, p_type, host):
+	def __init__(self, p_type, dst_opts):
 		self._mstat = mstats.migration_stats()
 		self.iteration = 0
 		self.prev_stats = None
 
 		print "Connecting to target host"
-		self.th = xem_rpc.rpc_proxy(host)
+		self.th = xem_rpc.rpc_proxy(dst_opts)
 		self.data_sk = self.th.open_socket("datask")
 
 		print "Setting up local"
@@ -42,7 +42,7 @@ class phaul_iter_worker:
 			raise Exception("No FS driver found")
 
 		self.pid = self.htype.root_task_pid()
-		self.fs.set_target_host(host)
+		self.fs.set_target_host(dst_opts["to"])
 
 		print "Setting up remote"
 		self.th.setup(p_type)
diff --git a/ssh_tunnel.py b/ssh_tunnel.py
new file mode 100644
index 0000000..8ea8360
--- /dev/null
+++ b/ssh_tunnel.py
@@ -0,0 +1,26 @@
+import subprocess
+import atexit
+
+def_ssh_port = 22
+def_loc_port = 54321
+
+ssh_tunnel_cmd = "ssh -N{ssh_comp} -L {ssh_loc_port}:localhost:{ssh_rem_port} -p {ssh_port} {ssh_user}@{to}"
+
+class Tunnel:
+	def __init__(self, dst_opts):
+		self._ssh = None
+		self._opts = dst_opts
+		atexit.register(self.stop)
+
+	def get_local_dst(self):
+		return ("127.0.0.1", self._opts["ssh_loc_port"])
+
+	def start(self):
+		cmd = ssh_tunnel_cmd.format(**self._opts)
+		self._ssh = subprocess.Popen(cmd.split())
+		print("SSH tunnel started")
+
+	def stop(self):
+		if self._ssh:
+			self._ssh.terminate()
+			print ("SSH tunnel stopped")
diff --git a/util.py b/util.py
index 9883bef..a0bcca4 100644
--- a/util.py
+++ b/util.py
@@ -1,6 +1,7 @@
 import os
 import fcntl
 import errno
+import xem_rpc
 
 class net_dev:
 	def init(self):
@@ -44,3 +45,12 @@ def makedirs(dirpath):
 			pass
 		else:
 			raise
+
+def pop_dest_opts(opts):
+	dest_opts = {}
+	dest_opts["to"] = opts.pop("to")
+	for k in opts.keys():
+		if k.startswith("ssh"):
+			dest_opts[k] = opts.pop(k)
+
+	return dest_opts, opts
diff --git a/xem_rpc.py b/xem_rpc.py
index f1d2c25..94d7798 100644
--- a/xem_rpc.py
+++ b/xem_rpc.py
@@ -4,8 +4,10 @@ import threading
 import traceback
 import util
 import struct
+import ssh_tunnel
+import errno
 
-rpc_port = 12345
+default_rpc_port = 12345
 rpc_sk_buf = 256
 
 RPC_CMD = 1
@@ -14,6 +16,7 @@ RPC_CALL = 2
 RPC_RESP = 1
 RPC_EXC = 2
 
+CONNECT_ATTEMPTS = 2000
 #
 # Client
 #
@@ -40,8 +43,11 @@ class _rpc_proxy_caller:
 			raise Exception("Proto resp error")
 
 class rpc_proxy:
-	def __init__(self, conn, *args):
-		self._srv = conn
+	def __init__(self, conn_opts, *args):
+		self._ssh = ssh_tunnel.Tunnel(conn_opts)
+		self._srv = self._ssh.get_local_dst()
+		self._ssh.start()
+
 		self._rpc_sk = self._make_sk()[0]
 		util.set_cloexec(self._rpc_sk)
 		_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
@@ -51,7 +57,17 @@ class rpc_proxy:
 
 	def _make_sk(self):
 		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.connect((self._srv, rpc_port))
+		# ssh tunnel needs some time to start working, so lets
+		# make CONNECT_ATTEMPTS attempts to connect.
+		for n in xrange(CONNECT_ATTEMPTS):
+			try:
+				sk.connect(self._srv)
+			except socket.error as e:
+				if e.errno != errno.ECONNREFUSED or n == CONNECT_ATTEMPTS - 1:
+					raise e
+				else:
+					continue
+			break
 		host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
 		return (sk, host)
 
@@ -62,7 +78,6 @@ class rpc_proxy:
 		return sk
 
 
-
 #
 # Server
 #
@@ -121,9 +136,9 @@ class _rpc_server_sk:
 			self._master.on_socket_open(sk._sk, uname)
 
 class _rpc_server_ask:
-	def __init__(self):
+	def __init__(self, port):
 		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.bind(("0.0.0.0", rpc_port))
+		sk.bind(("127.0.0.1", port))
 		sk.listen(8)
 		self._sk = sk
 		util.set_cloexec(self)
@@ -146,10 +161,10 @@ class _rpc_stop_fd:
 		mgr.stop()
 
 class _rpc_server_manager:
-	def __init__(self, srv_class):
+	def __init__(self, srv_class, port):
 		self._srv_class = srv_class
 		self._sk_by_name = {}
-		self._poll_list = [_rpc_server_ask()]
+		self._poll_list = [_rpc_server_ask(port)]
 		self._alive = True
 
 	def add(self, sk):
@@ -184,9 +199,9 @@ class _rpc_server_manager:
 		print "RPC Service stops"
 
 class rpc_threaded_srv(threading.Thread):
-	def __init__(self, srv_class):
+	def __init__(self, srv_class, port):
 		threading.Thread.__init__(self)
-		self._mgr = _rpc_server_manager(srv_class)
+		self._mgr = _rpc_server_manager(srv_class, port)
 		self._stop_fd = None
 
 	def run(self):
-- 
1.9.3



More information about the CRIU mailing list