[CRIU] [PATCH 2/2] p.haul: use ssh tunneling and controll it with ssh* cmdline opts

Pavel Emelyanov xemul at parallels.com
Mon Oct 27 11:13:37 PDT 2014


On 10/27/2014 04:56 PM, Ruslan Kuprieiev wrote:
> Currently we have such security holes:
> 
> 1) All p.haul traffic goes without any encryption, which is
> not safe at all, cosidering that attacker can easily peek memory pages
> of migrating process. Lets solve that by using ssh tunnel which allows
> us to easily encrypt and compress traffic.
> Compressing is useful only when connection is very slow, but will only
> slow down things on fast networks.
> Using ssh tunnel also allows us to solve keys\certificates management
> problem in a very common way that is familiar to any system administrator.
> 
> 2) p.haul-service binds to 0.0.0.0 and is accesible from the outside
> for anyone who is trying to connect to it. So attacket can easily connect
> to p.haul-service and migrate some malicious process to the server.
> Lets fix that by binding p.haul-service to 127.0.0.1 so it is accessible
> only from its localhost.
> 
> So, basically, we perform following actions when migrating process
> from src to dest:
> 1. (user at dest) Start p.haul-service on localhost:12345
> 2. (user at src)  Create ssh tunnel:
> 	ssh -NC 54321:localhost:12345 remote_ip

This is done by p.haul, right? What if ssh will need to ask for
a password, what would it do?

> 3. (user at src)  Connect p.haul to localhost:54321 and migrate
> 
> Conflicts:
> 	xem_rpc.py
> 
> Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
> ---
>  p.haul          | 16 ++++++++++++++--
>  p.haul-service  |  8 +++++++-
>  p_haul_iters.py |  6 +++---
>  ssh_tunnel.py   | 26 ++++++++++++++++++++++++++
>  util.py         | 10 ++++++++++
>  xem_rpc.py      | 37 ++++++++++++++++++++++++++-----------
>  6 files changed, 86 insertions(+), 17 deletions(-)
>  create mode 100644 ssh_tunnel.py
> 
> diff --git a/p.haul b/p.haul
> index b8f050e..f71a59d 100755
> --- a/p.haul
> +++ b/p.haul
> @@ -4,6 +4,11 @@ import argparse
>  import p_haul_iters as ph_iters
>  import images
>  import criu_api
> +import xem_rpc
> +import ssh_tunnel
> +import util
> +import os
> +import pwd
>  
>  # Usage idea
>  # p.haul <type> <id> <destination>
> @@ -28,12 +33,19 @@ parser.add_argument("--img-path", help = "Dirctory where to put images", default
>  parser.add_argument("--pid-root", help = "Path to tree's FS root")
>  parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks", default = False, action = 'store_true')
>  
> +ssh = parser.add_argument_group("ssh tunnel", "CMD to establish ssh tunnel: " + ssh_tunnel.ssh_tunnel_cmd)
> +ssh.add_argument("--ssh-comp", help = "Use compression in the ssh tunnel", default = "", action = "store_const", const = "C")
> +ssh.add_argument("--ssh-user", help = "Remote username", type = str, default = pwd.getpwuid(os.getuid()).pw_name)
> +ssh.add_argument("--ssh-port", help = "Port to connect to ssh on the remote host", default = ssh_tunnel.def_ssh_port, type = int)
> +ssh.add_argument("--ssh-loc-port", help = "Local port to use for forwarding", default = ssh_tunnel.def_loc_port, type = int)
> +ssh.add_argument("--ssh-rem-port", help = "Remote port to use for forwarding (p.haul-service port)", default = xem_rpc.default_rpc_port, type = int)
> +
>  args = vars(parser.parse_args())
>  
>  ph_type = (args.pop("type"), args.pop("id"))
> -dst = args.pop("to")
> +dst_opts, args = util.pop_dest_opts(args)
>  
>  # Start the migration
> -worker = ph_iters.phaul_iter_worker(ph_type, dst)
> +worker = ph_iters.phaul_iter_worker(ph_type, dst_opts)
>  worker.set_options(args)
>  worker.start_migration()
> diff --git a/p.haul-service b/p.haul-service
> index 7444df5..46d917a 100755
> --- a/p.haul-service
> +++ b/p.haul-service
> @@ -3,15 +3,21 @@
>  import signal
>  import xem_rpc
>  import p_haul_service as ph_srv
> +import argparse
>  
>  if __name__ == "__main__":
> +	parser = argparse.ArgumentParser("Process HAULer service server")
> +	parser.add_argument("-p", "--port", help = "port to bind on", default = xem_rpc.default_rpc_port, type = int)
> +
> +	args = vars(parser.parse_args())
> +
>  	sfd = None
>  	def fin(foo, bar):
>  		print "Stop by %d" % foo
>  		sfd.close()
>  
>  	print "Starting p.haul rpyc service"
> -	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service)
> +	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, args["port"])
>  
>  	# FIXME: Setup stop handlers
>  	sfd = t.get_stop_fd()
> diff --git a/p_haul_iters.py b/p_haul_iters.py
> index 0463e78..10dfb3f 100644
> --- a/p_haul_iters.py
> +++ b/p_haul_iters.py
> @@ -21,13 +21,13 @@ phaul_iter_min_size = 64
>  phaul_iter_grow_max = 10
>  
>  class phaul_iter_worker:
> -	def __init__(self, p_type, host):
> +	def __init__(self, p_type, dst_opts):
>  		self._mstat = mstats.migration_stats()
>  		self.iteration = 0
>  		self.prev_stats = None
>  
>  		print "Connecting to target host"
> -		self.th = xem_rpc.rpc_proxy(host)
> +		self.th = xem_rpc.rpc_proxy(dst_opts)
>  		self.data_sk = self.th.open_socket("datask")
>  
>  		print "Setting up local"
> @@ -42,7 +42,7 @@ class phaul_iter_worker:
>  			raise Exception("No FS driver found")
>  
>  		self.pid = self.htype.root_task_pid()
> -		self.fs.set_target_host(host)
> +		self.fs.set_target_host(dst_opts["to"])
>  
>  		print "Setting up remote"
>  		self.th.setup(p_type)
> diff --git a/ssh_tunnel.py b/ssh_tunnel.py
> new file mode 100644
> index 0000000..8ea8360
> --- /dev/null
> +++ b/ssh_tunnel.py
> @@ -0,0 +1,26 @@
> +import subprocess
> +import atexit
> +
> +def_ssh_port = 22
> +def_loc_port = 54321
> +
> +ssh_tunnel_cmd = "ssh -N{ssh_comp} -L {ssh_loc_port}:localhost:{ssh_rem_port} -p {ssh_port} {ssh_user}@{to}"
> +
> +class Tunnel:
> +	def __init__(self, dst_opts):
> +		self._ssh = None
> +		self._opts = dst_opts
> +		atexit.register(self.stop)
> +
> +	def get_local_dst(self):
> +		return ("127.0.0.1", self._opts["ssh_loc_port"])
> +
> +	def start(self):
> +		cmd = ssh_tunnel_cmd.format(**self._opts)
> +		self._ssh = subprocess.Popen(cmd.split())
> +		print("SSH tunnel started")
> +
> +	def stop(self):
> +		if self._ssh:
> +			self._ssh.terminate()
> +			print ("SSH tunnel stopped")
> diff --git a/util.py b/util.py
> index 9883bef..a0bcca4 100644
> --- a/util.py
> +++ b/util.py
> @@ -1,6 +1,7 @@
>  import os
>  import fcntl
>  import errno
> +import xem_rpc
>  
>  class net_dev:
>  	def init(self):
> @@ -44,3 +45,12 @@ def makedirs(dirpath):
>  			pass
>  		else:
>  			raise
> +
> +def pop_dest_opts(opts):
> +	dest_opts = {}
> +	dest_opts["to"] = opts.pop("to")
> +	for k in opts.keys():
> +		if k.startswith("ssh"):
> +			dest_opts[k] = opts.pop(k)
> +
> +	return dest_opts, opts
> diff --git a/xem_rpc.py b/xem_rpc.py
> index f1d2c25..94d7798 100644
> --- a/xem_rpc.py
> +++ b/xem_rpc.py
> @@ -4,8 +4,10 @@ import threading
>  import traceback
>  import util
>  import struct
> +import ssh_tunnel
> +import errno
>  
> -rpc_port = 12345
> +default_rpc_port = 12345
>  rpc_sk_buf = 256
>  
>  RPC_CMD = 1
> @@ -14,6 +16,7 @@ RPC_CALL = 2
>  RPC_RESP = 1
>  RPC_EXC = 2
>  
> +CONNECT_ATTEMPTS = 2000
>  #
>  # Client
>  #
> @@ -40,8 +43,11 @@ class _rpc_proxy_caller:
>  			raise Exception("Proto resp error")
>  
>  class rpc_proxy:
> -	def __init__(self, conn, *args):
> -		self._srv = conn
> +	def __init__(self, conn_opts, *args):
> +		self._ssh = ssh_tunnel.Tunnel(conn_opts)
> +		self._srv = self._ssh.get_local_dst()
> +		self._ssh.start()
> +
>  		self._rpc_sk = self._make_sk()[0]
>  		util.set_cloexec(self._rpc_sk)
>  		_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
> @@ -51,7 +57,17 @@ class rpc_proxy:
>  
>  	def _make_sk(self):
>  		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> -		sk.connect((self._srv, rpc_port))
> +		# ssh tunnel needs some time to start working, so lets
> +		# make CONNECT_ATTEMPTS attempts to connect.
> +		for n in xrange(CONNECT_ATTEMPTS):
> +			try:
> +				sk.connect(self._srv)
> +			except socket.error as e:
> +				if e.errno != errno.ECONNREFUSED or n == CONNECT_ATTEMPTS - 1:
> +					raise e
> +				else:
> +					continue
> +			break
>  		host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
>  		return (sk, host)
>  
> @@ -62,7 +78,6 @@ class rpc_proxy:
>  		return sk
>  
>  
> -
>  #
>  # Server
>  #
> @@ -121,9 +136,9 @@ class _rpc_server_sk:
>  			self._master.on_socket_open(sk._sk, uname)
>  
>  class _rpc_server_ask:
> -	def __init__(self):
> +	def __init__(self, port):
>  		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> -		sk.bind(("0.0.0.0", rpc_port))
> +		sk.bind(("127.0.0.1", port))
>  		sk.listen(8)
>  		self._sk = sk
>  		util.set_cloexec(self)
> @@ -146,10 +161,10 @@ class _rpc_stop_fd:
>  		mgr.stop()
>  
>  class _rpc_server_manager:
> -	def __init__(self, srv_class):
> +	def __init__(self, srv_class, port):
>  		self._srv_class = srv_class
>  		self._sk_by_name = {}
> -		self._poll_list = [_rpc_server_ask()]
> +		self._poll_list = [_rpc_server_ask(port)]
>  		self._alive = True
>  
>  	def add(self, sk):
> @@ -184,9 +199,9 @@ class _rpc_server_manager:
>  		print "RPC Service stops"
>  
>  class rpc_threaded_srv(threading.Thread):
> -	def __init__(self, srv_class):
> +	def __init__(self, srv_class, port):
>  		threading.Thread.__init__(self)
> -		self._mgr = _rpc_server_manager(srv_class)
> +		self._mgr = _rpc_server_manager(srv_class, port)
>  		self._stop_fd = None
>  
>  	def run(self):
> 



More information about the CRIU mailing list