[CRIU] [PATCH 2/2] p.haul: use ssh tunneling and controll it with ssh* cmdline opts
Ruslan Kuprieiev
kupruser at gmail.com
Mon Oct 27 12:37:03 PDT 2014
On 27.10.2014 20:25, Pavel Emelyanov wrote:
> On 10/27/2014 04:56 PM, Ruslan Kuprieiev wrote:
>> Currently we have such security holes:
>>
>> 1) All p.haul traffic goes without any encryption, which is
>> not safe at all, cosidering that attacker can easily peek memory pages
>> of migrating process. Lets solve that by using ssh tunnel which allows
>> us to easily encrypt and compress traffic.
>> Compressing is useful only when connection is very slow, but will only
>> slow down things on fast networks.
>> Using ssh tunnel also allows us to solve keys\certificates management
>> problem in a very common way that is familiar to any system administrator.
>>
>> 2) p.haul-service binds to 0.0.0.0 and is accesible from the outside
>> for anyone who is trying to connect to it. So attacket can easily connect
>> to p.haul-service and migrate some malicious process to the server.
>> Lets fix that by binding p.haul-service to 127.0.0.1 so it is accessible
>> only from its localhost.
>>
>> So, basically, we perform following actions when migrating process
>> from src to dest:
>> 1. (user at dest) Start p.haul-service on localhost:12345
>> 2. (user at src) Create ssh tunnel:
>> ssh -NC 54321:localhost:12345 remote_ip
>> 3. (user at src) Connect p.haul to localhost:54321 and migrate
>>
>> Conflicts:
>> xem_rpc.py
>>
>> Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
>> ---
>> p.haul | 16 ++++++++++++++--
>> p.haul-service | 8 +++++++-
>> p_haul_iters.py | 6 +++---
>> ssh_tunnel.py | 26 ++++++++++++++++++++++++++
>> util.py | 10 ++++++++++
>> xem_rpc.py | 37 ++++++++++++++++++++++++++-----------
>> 6 files changed, 86 insertions(+), 17 deletions(-)
>> create mode 100644 ssh_tunnel.py
>>
>> diff --git a/p.haul b/p.haul
>> index b8f050e..f71a59d 100755
>> --- a/p.haul
>> +++ b/p.haul
>> @@ -4,6 +4,11 @@ import argparse
>> import p_haul_iters as ph_iters
>> import images
>> import criu_api
>> +import xem_rpc
>> +import ssh_tunnel
>> +import util
>> +import os
>> +import pwd
>>
>> # Usage idea
>> # p.haul <type> <id> <destination>
>> @@ -28,12 +33,19 @@ parser.add_argument("--img-path", help = "Dirctory where to put images", default
>> parser.add_argument("--pid-root", help = "Path to tree's FS root")
>> parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks", default = False, action = 'store_true')
>>
>> +ssh = parser.add_argument_group("ssh tunnel", "CMD to establish ssh tunnel: " + ssh_tunnel.ssh_tunnel_cmd)
>> +ssh.add_argument("--ssh-comp", help = "Use compression in the ssh tunnel", default = "", action = "store_const", const = "C")
>> +ssh.add_argument("--ssh-user", help = "Remote username", type = str, default = pwd.getpwuid(os.getuid()).pw_name)
>> +ssh.add_argument("--ssh-port", help = "Port to connect to ssh on the remote host", default = ssh_tunnel.def_ssh_port, type = int)
>> +ssh.add_argument("--ssh-loc-port", help = "Local port to use for forwarding", default = ssh_tunnel.def_loc_port, type = int)
>> +ssh.add_argument("--ssh-rem-port", help = "Remote port to use for forwarding (p.haul-service port)", default = xem_rpc.default_rpc_port, type = int)
>> +
>> args = vars(parser.parse_args())
>>
>> ph_type = (args.pop("type"), args.pop("id"))
>> -dst = args.pop("to")
>> +dst_opts, args = util.pop_dest_opts(args)
>>
>> # Start the migration
>> -worker = ph_iters.phaul_iter_worker(ph_type, dst)
>> +worker = ph_iters.phaul_iter_worker(ph_type, dst_opts)
>> worker.set_options(args)
>> worker.start_migration()
>> diff --git a/p.haul-service b/p.haul-service
>> index 7444df5..46d917a 100755
>> --- a/p.haul-service
>> +++ b/p.haul-service
>> @@ -3,15 +3,21 @@
>> import signal
>> import xem_rpc
>> import p_haul_service as ph_srv
>> +import argparse
>>
>> if __name__ == "__main__":
>> + parser = argparse.ArgumentParser("Process HAULer service server")
>> + parser.add_argument("-p", "--port", help = "port to bind on", default = xem_rpc.default_rpc_port, type = int)
>> +
>> + args = vars(parser.parse_args())
>> +
>> sfd = None
>> def fin(foo, bar):
>> print "Stop by %d" % foo
>> sfd.close()
>>
>> print "Starting p.haul rpyc service"
>> - t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service)
>> + t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, args["port"])
>>
>> # FIXME: Setup stop handlers
>> sfd = t.get_stop_fd()
>> diff --git a/p_haul_iters.py b/p_haul_iters.py
>> index 0463e78..10dfb3f 100644
>> --- a/p_haul_iters.py
>> +++ b/p_haul_iters.py
>> @@ -21,13 +21,13 @@ phaul_iter_min_size = 64
>> phaul_iter_grow_max = 10
>>
>> class phaul_iter_worker:
>> - def __init__(self, p_type, host):
>> + def __init__(self, p_type, dst_opts):
>> self._mstat = mstats.migration_stats()
>> self.iteration = 0
>> self.prev_stats = None
>>
>> print "Connecting to target host"
>> - self.th = xem_rpc.rpc_proxy(host)
>> + self.th = xem_rpc.rpc_proxy(dst_opts)
>> self.data_sk = self.th.open_socket("datask")
>>
>> print "Setting up local"
>> @@ -42,7 +42,7 @@ class phaul_iter_worker:
>> raise Exception("No FS driver found")
>>
>> self.pid = self.htype.root_task_pid()
>> - self.fs.set_target_host(host)
>> + self.fs.set_target_host(dst_opts["to"])
>>
>> print "Setting up remote"
>> self.th.setup(p_type)
>> diff --git a/ssh_tunnel.py b/ssh_tunnel.py
>> new file mode 100644
>> index 0000000..8ea8360
>> --- /dev/null
>> +++ b/ssh_tunnel.py
>> @@ -0,0 +1,26 @@
>> +import subprocess
>> +import atexit
>> +
>> +def_ssh_port = 22
>> +def_loc_port = 54321
>> +
>> +ssh_tunnel_cmd = "ssh -N{ssh_comp} -L {ssh_loc_port}:localhost:{ssh_rem_port} -p {ssh_port} {ssh_user}@{to}"
>> +
>> +class Tunnel:
>> + def __init__(self, dst_opts):
>> + self._ssh = None
>> + self._opts = dst_opts
>> + atexit.register(self.stop)
>> +
>> + def get_local_dst(self):
>> + return ("127.0.0.1", self._opts["ssh_loc_port"])
>> +
>> + def start(self):
>> + cmd = ssh_tunnel_cmd.format(**self._opts)
>> + self._ssh = subprocess.Popen(cmd.split())
>> + print("SSH tunnel started")
>> +
>> + def stop(self):
>> + if self._ssh:
>> + self._ssh.terminate()
>> + print ("SSH tunnel stopped")
>> diff --git a/util.py b/util.py
>> index 9883bef..a0bcca4 100644
>> --- a/util.py
>> +++ b/util.py
>> @@ -1,6 +1,7 @@
>> import os
>> import fcntl
>> import errno
>> +import xem_rpc
>>
>> class net_dev:
>> def init(self):
>> @@ -44,3 +45,12 @@ def makedirs(dirpath):
>> pass
>> else:
>> raise
>> +
>> +def pop_dest_opts(opts):
>> + dest_opts = {}
>> + dest_opts["to"] = opts.pop("to")
>> + for k in opts.keys():
>> + if k.startswith("ssh"):
>> + dest_opts[k] = opts.pop(k)
>> +
>> + return dest_opts, opts
>> diff --git a/xem_rpc.py b/xem_rpc.py
>> index f1d2c25..94d7798 100644
>> --- a/xem_rpc.py
>> +++ b/xem_rpc.py
>> @@ -4,8 +4,10 @@ import threading
>> import traceback
>> import util
>> import struct
>> +import ssh_tunnel
>> +import errno
>>
>> -rpc_port = 12345
>> +default_rpc_port = 12345
>> rpc_sk_buf = 256
>>
>> RPC_CMD = 1
>> @@ -14,6 +16,7 @@ RPC_CALL = 2
>> RPC_RESP = 1
>> RPC_EXC = 2
>>
>> +CONNECT_ATTEMPTS = 2000
>> #
>> # Client
>> #
>> @@ -40,8 +43,11 @@ class _rpc_proxy_caller:
>> raise Exception("Proto resp error")
>>
>> class rpc_proxy:
>> - def __init__(self, conn, *args):
>> - self._srv = conn
>> + def __init__(self, conn_opts, *args):
>> + self._ssh = ssh_tunnel.Tunnel(conn_opts)
> Still need an ability to do ssh-less migration ;)
Why?
>> + self._srv = self._ssh.get_local_dst()
>> + self._ssh.start()
>> +
>> self._rpc_sk = self._make_sk()[0]
>> util.set_cloexec(self._rpc_sk)
>> _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
>> @@ -51,7 +57,17 @@ class rpc_proxy:
>>
>> def _make_sk(self):
>> sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>> - sk.connect((self._srv, rpc_port))
>> + # ssh tunnel needs some time to start working, so lets
>> + # make CONNECT_ATTEMPTS attempts to connect.
> I don't like it. Can you shed more light on what's going on here?
We have no way to check if ssh tunnel is created other than try to connect.
But I should have used connection timeout here =). Will fix.
>> + for n in xrange(CONNECT_ATTEMPTS):
>> + try:
>> + sk.connect(self._srv)
>> + except socket.error as e:
>> + if e.errno != errno.ECONNREFUSED or n == CONNECT_ATTEMPTS - 1:
>> + raise e
>> + else:
>> + continue
>> + break
>> host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
>> return (sk, host)
>>
>> @@ -62,7 +78,6 @@ class rpc_proxy:
>> return sk
>>
>>
>> -
>> #
>> # Server
>> #
>> @@ -121,9 +136,9 @@ class _rpc_server_sk:
>> self._master.on_socket_open(sk._sk, uname)
>>
>> class _rpc_server_ask:
>> - def __init__(self):
>> + def __init__(self, port):
> This should go as separate patch.
Ok.
>> sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>> - sk.bind(("0.0.0.0", rpc_port))
>> + sk.bind(("127.0.0.1", port))
>> sk.listen(8)
>> self._sk = sk
>> util.set_cloexec(self)
>> @@ -146,10 +161,10 @@ class _rpc_stop_fd:
>> mgr.stop()
>>
>> class _rpc_server_manager:
>> - def __init__(self, srv_class):
>> + def __init__(self, srv_class, port):
>> self._srv_class = srv_class
>> self._sk_by_name = {}
>> - self._poll_list = [_rpc_server_ask()]
>> + self._poll_list = [_rpc_server_ask(port)]
>> self._alive = True
>>
>> def add(self, sk):
>> @@ -184,9 +199,9 @@ class _rpc_server_manager:
>> print "RPC Service stops"
>>
>> class rpc_threaded_srv(threading.Thread):
>> - def __init__(self, srv_class):
>> + def __init__(self, srv_class, port):
>> threading.Thread.__init__(self)
>> - self._mgr = _rpc_server_manager(srv_class)
>> + self._mgr = _rpc_server_manager(srv_class, port)
>> self._stop_fd = None
>>
>> def run(self):
>>
More information about the CRIU
mailing list