[CRIU] [PATCH 2/2] p.haul: use ssh tunneling and controll it with ssh* cmdline opts

Ruslan Kuprieiev kupruser at gmail.com
Mon Oct 27 12:20:55 PDT 2014


On 27.10.2014 20:13, Pavel Emelyanov wrote:
> On 10/27/2014 04:56 PM, Ruslan Kuprieiev wrote:
>> Currently we have such security holes:
>>
>> 1) All p.haul traffic goes without any encryption, which is
>> not safe at all, cosidering that attacker can easily peek memory pages
>> of migrating process. Lets solve that by using ssh tunnel which allows
>> us to easily encrypt and compress traffic.
>> Compressing is useful only when connection is very slow, but will only
>> slow down things on fast networks.
>> Using ssh tunnel also allows us to solve keys\certificates management
>> problem in a very common way that is familiar to any system administrator.
>>
>> 2) p.haul-service binds to 0.0.0.0 and is accesible from the outside
>> for anyone who is trying to connect to it. So attacket can easily connect
>> to p.haul-service and migrate some malicious process to the server.
>> Lets fix that by binding p.haul-service to 127.0.0.1 so it is accessible
>> only from its localhost.
>>
>> So, basically, we perform following actions when migrating process
>> from src to dest:
>> 1. (user at dest) Start p.haul-service on localhost:12345
>> 2. (user at src)  Create ssh tunnel:
>> 	ssh -NC 54321:localhost:12345 remote_ip
> This is done by p.haul, right? What if ssh will need to ask for
> a password, what would it do?

Good system administrators don't use passwords, they use keys =).
http://www.linuxproblem.org/art_9.html

>> 3. (user at src)  Connect p.haul to localhost:54321 and migrate
>>
>> Conflicts:
>> 	xem_rpc.py
>>
>> Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
>> ---
>>   p.haul          | 16 ++++++++++++++--
>>   p.haul-service  |  8 +++++++-
>>   p_haul_iters.py |  6 +++---
>>   ssh_tunnel.py   | 26 ++++++++++++++++++++++++++
>>   util.py         | 10 ++++++++++
>>   xem_rpc.py      | 37 ++++++++++++++++++++++++++-----------
>>   6 files changed, 86 insertions(+), 17 deletions(-)
>>   create mode 100644 ssh_tunnel.py
>>
>> diff --git a/p.haul b/p.haul
>> index b8f050e..f71a59d 100755
>> --- a/p.haul
>> +++ b/p.haul
>> @@ -4,6 +4,11 @@ import argparse
>>   import p_haul_iters as ph_iters
>>   import images
>>   import criu_api
>> +import xem_rpc
>> +import ssh_tunnel
>> +import util
>> +import os
>> +import pwd
>>   
>>   # Usage idea
>>   # p.haul <type> <id> <destination>
>> @@ -28,12 +33,19 @@ parser.add_argument("--img-path", help = "Dirctory where to put images", default
>>   parser.add_argument("--pid-root", help = "Path to tree's FS root")
>>   parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks", default = False, action = 'store_true')
>>   
>> +ssh = parser.add_argument_group("ssh tunnel", "CMD to establish ssh tunnel: " + ssh_tunnel.ssh_tunnel_cmd)
>> +ssh.add_argument("--ssh-comp", help = "Use compression in the ssh tunnel", default = "", action = "store_const", const = "C")
>> +ssh.add_argument("--ssh-user", help = "Remote username", type = str, default = pwd.getpwuid(os.getuid()).pw_name)
>> +ssh.add_argument("--ssh-port", help = "Port to connect to ssh on the remote host", default = ssh_tunnel.def_ssh_port, type = int)
>> +ssh.add_argument("--ssh-loc-port", help = "Local port to use for forwarding", default = ssh_tunnel.def_loc_port, type = int)
>> +ssh.add_argument("--ssh-rem-port", help = "Remote port to use for forwarding (p.haul-service port)", default = xem_rpc.default_rpc_port, type = int)
>> +
>>   args = vars(parser.parse_args())
>>   
>>   ph_type = (args.pop("type"), args.pop("id"))
>> -dst = args.pop("to")
>> +dst_opts, args = util.pop_dest_opts(args)
>>   
>>   # Start the migration
>> -worker = ph_iters.phaul_iter_worker(ph_type, dst)
>> +worker = ph_iters.phaul_iter_worker(ph_type, dst_opts)
>>   worker.set_options(args)
>>   worker.start_migration()
>> diff --git a/p.haul-service b/p.haul-service
>> index 7444df5..46d917a 100755
>> --- a/p.haul-service
>> +++ b/p.haul-service
>> @@ -3,15 +3,21 @@
>>   import signal
>>   import xem_rpc
>>   import p_haul_service as ph_srv
>> +import argparse
>>   
>>   if __name__ == "__main__":
>> +	parser = argparse.ArgumentParser("Process HAULer service server")
>> +	parser.add_argument("-p", "--port", help = "port to bind on", default = xem_rpc.default_rpc_port, type = int)
>> +
>> +	args = vars(parser.parse_args())
>> +
>>   	sfd = None
>>   	def fin(foo, bar):
>>   		print "Stop by %d" % foo
>>   		sfd.close()
>>   
>>   	print "Starting p.haul rpyc service"
>> -	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service)
>> +	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, args["port"])
>>   
>>   	# FIXME: Setup stop handlers
>>   	sfd = t.get_stop_fd()
>> diff --git a/p_haul_iters.py b/p_haul_iters.py
>> index 0463e78..10dfb3f 100644
>> --- a/p_haul_iters.py
>> +++ b/p_haul_iters.py
>> @@ -21,13 +21,13 @@ phaul_iter_min_size = 64
>>   phaul_iter_grow_max = 10
>>   
>>   class phaul_iter_worker:
>> -	def __init__(self, p_type, host):
>> +	def __init__(self, p_type, dst_opts):
>>   		self._mstat = mstats.migration_stats()
>>   		self.iteration = 0
>>   		self.prev_stats = None
>>   
>>   		print "Connecting to target host"
>> -		self.th = xem_rpc.rpc_proxy(host)
>> +		self.th = xem_rpc.rpc_proxy(dst_opts)
>>   		self.data_sk = self.th.open_socket("datask")
>>   
>>   		print "Setting up local"
>> @@ -42,7 +42,7 @@ class phaul_iter_worker:
>>   			raise Exception("No FS driver found")
>>   
>>   		self.pid = self.htype.root_task_pid()
>> -		self.fs.set_target_host(host)
>> +		self.fs.set_target_host(dst_opts["to"])
>>   
>>   		print "Setting up remote"
>>   		self.th.setup(p_type)
>> diff --git a/ssh_tunnel.py b/ssh_tunnel.py
>> new file mode 100644
>> index 0000000..8ea8360
>> --- /dev/null
>> +++ b/ssh_tunnel.py
>> @@ -0,0 +1,26 @@
>> +import subprocess
>> +import atexit
>> +
>> +def_ssh_port = 22
>> +def_loc_port = 54321
>> +
>> +ssh_tunnel_cmd = "ssh -N{ssh_comp} -L {ssh_loc_port}:localhost:{ssh_rem_port} -p {ssh_port} {ssh_user}@{to}"
>> +
>> +class Tunnel:
>> +	def __init__(self, dst_opts):
>> +		self._ssh = None
>> +		self._opts = dst_opts
>> +		atexit.register(self.stop)
>> +
>> +	def get_local_dst(self):
>> +		return ("127.0.0.1", self._opts["ssh_loc_port"])
>> +
>> +	def start(self):
>> +		cmd = ssh_tunnel_cmd.format(**self._opts)
>> +		self._ssh = subprocess.Popen(cmd.split())
>> +		print("SSH tunnel started")
>> +
>> +	def stop(self):
>> +		if self._ssh:
>> +			self._ssh.terminate()
>> +			print ("SSH tunnel stopped")
>> diff --git a/util.py b/util.py
>> index 9883bef..a0bcca4 100644
>> --- a/util.py
>> +++ b/util.py
>> @@ -1,6 +1,7 @@
>>   import os
>>   import fcntl
>>   import errno
>> +import xem_rpc
>>   
>>   class net_dev:
>>   	def init(self):
>> @@ -44,3 +45,12 @@ def makedirs(dirpath):
>>   			pass
>>   		else:
>>   			raise
>> +
>> +def pop_dest_opts(opts):
>> +	dest_opts = {}
>> +	dest_opts["to"] = opts.pop("to")
>> +	for k in opts.keys():
>> +		if k.startswith("ssh"):
>> +			dest_opts[k] = opts.pop(k)
>> +
>> +	return dest_opts, opts
>> diff --git a/xem_rpc.py b/xem_rpc.py
>> index f1d2c25..94d7798 100644
>> --- a/xem_rpc.py
>> +++ b/xem_rpc.py
>> @@ -4,8 +4,10 @@ import threading
>>   import traceback
>>   import util
>>   import struct
>> +import ssh_tunnel
>> +import errno
>>   
>> -rpc_port = 12345
>> +default_rpc_port = 12345
>>   rpc_sk_buf = 256
>>   
>>   RPC_CMD = 1
>> @@ -14,6 +16,7 @@ RPC_CALL = 2
>>   RPC_RESP = 1
>>   RPC_EXC = 2
>>   
>> +CONNECT_ATTEMPTS = 2000
>>   #
>>   # Client
>>   #
>> @@ -40,8 +43,11 @@ class _rpc_proxy_caller:
>>   			raise Exception("Proto resp error")
>>   
>>   class rpc_proxy:
>> -	def __init__(self, conn, *args):
>> -		self._srv = conn
>> +	def __init__(self, conn_opts, *args):
>> +		self._ssh = ssh_tunnel.Tunnel(conn_opts)
>> +		self._srv = self._ssh.get_local_dst()
>> +		self._ssh.start()
>> +
>>   		self._rpc_sk = self._make_sk()[0]
>>   		util.set_cloexec(self._rpc_sk)
>>   		_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
>> @@ -51,7 +57,17 @@ class rpc_proxy:
>>   
>>   	def _make_sk(self):
>>   		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>> -		sk.connect((self._srv, rpc_port))
>> +		# ssh tunnel needs some time to start working, so lets
>> +		# make CONNECT_ATTEMPTS attempts to connect.
>> +		for n in xrange(CONNECT_ATTEMPTS):
>> +			try:
>> +				sk.connect(self._srv)
>> +			except socket.error as e:
>> +				if e.errno != errno.ECONNREFUSED or n == CONNECT_ATTEMPTS - 1:
>> +					raise e
>> +				else:
>> +					continue
>> +			break
>>   		host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
>>   		return (sk, host)
>>   
>> @@ -62,7 +78,6 @@ class rpc_proxy:
>>   		return sk
>>   
>>   
>> -
>>   #
>>   # Server
>>   #
>> @@ -121,9 +136,9 @@ class _rpc_server_sk:
>>   			self._master.on_socket_open(sk._sk, uname)
>>   
>>   class _rpc_server_ask:
>> -	def __init__(self):
>> +	def __init__(self, port):
>>   		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>> -		sk.bind(("0.0.0.0", rpc_port))
>> +		sk.bind(("127.0.0.1", port))
>>   		sk.listen(8)
>>   		self._sk = sk
>>   		util.set_cloexec(self)
>> @@ -146,10 +161,10 @@ class _rpc_stop_fd:
>>   		mgr.stop()
>>   
>>   class _rpc_server_manager:
>> -	def __init__(self, srv_class):
>> +	def __init__(self, srv_class, port):
>>   		self._srv_class = srv_class
>>   		self._sk_by_name = {}
>> -		self._poll_list = [_rpc_server_ask()]
>> +		self._poll_list = [_rpc_server_ask(port)]
>>   		self._alive = True
>>   
>>   	def add(self, sk):
>> @@ -184,9 +199,9 @@ class _rpc_server_manager:
>>   		print "RPC Service stops"
>>   
>>   class rpc_threaded_srv(threading.Thread):
>> -	def __init__(self, srv_class):
>> +	def __init__(self, srv_class, port):
>>   		threading.Thread.__init__(self)
>> -		self._mgr = _rpc_server_manager(srv_class)
>> +		self._mgr = _rpc_server_manager(srv_class, port)
>>   		self._stop_fd = None
>>   
>>   	def run(self):
>>



More information about the CRIU mailing list