[CRIU] [PATCH 2/2] p.haul: use ssh tunneling and controll it with ssh* cmdline opts
Pavel Emelyanov
xemul at parallels.com
Mon Oct 27 11:25:06 PDT 2014
On 10/27/2014 04:56 PM, Ruslan Kuprieiev wrote:
> Currently we have such security holes:
>
> 1) All p.haul traffic goes without any encryption, which is
> not safe at all, cosidering that attacker can easily peek memory pages
> of migrating process. Lets solve that by using ssh tunnel which allows
> us to easily encrypt and compress traffic.
> Compressing is useful only when connection is very slow, but will only
> slow down things on fast networks.
> Using ssh tunnel also allows us to solve keys\certificates management
> problem in a very common way that is familiar to any system administrator.
>
> 2) p.haul-service binds to 0.0.0.0 and is accesible from the outside
> for anyone who is trying to connect to it. So attacket can easily connect
> to p.haul-service and migrate some malicious process to the server.
> Lets fix that by binding p.haul-service to 127.0.0.1 so it is accessible
> only from its localhost.
>
> So, basically, we perform following actions when migrating process
> from src to dest:
> 1. (user at dest) Start p.haul-service on localhost:12345
> 2. (user at src) Create ssh tunnel:
> ssh -NC 54321:localhost:12345 remote_ip
> 3. (user at src) Connect p.haul to localhost:54321 and migrate
>
> Conflicts:
> xem_rpc.py
>
> Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
> ---
> p.haul | 16 ++++++++++++++--
> p.haul-service | 8 +++++++-
> p_haul_iters.py | 6 +++---
> ssh_tunnel.py | 26 ++++++++++++++++++++++++++
> util.py | 10 ++++++++++
> xem_rpc.py | 37 ++++++++++++++++++++++++++-----------
> 6 files changed, 86 insertions(+), 17 deletions(-)
> create mode 100644 ssh_tunnel.py
>
> diff --git a/p.haul b/p.haul
> index b8f050e..f71a59d 100755
> --- a/p.haul
> +++ b/p.haul
> @@ -4,6 +4,11 @@ import argparse
> import p_haul_iters as ph_iters
> import images
> import criu_api
> +import xem_rpc
> +import ssh_tunnel
> +import util
> +import os
> +import pwd
>
> # Usage idea
> # p.haul <type> <id> <destination>
> @@ -28,12 +33,19 @@ parser.add_argument("--img-path", help = "Dirctory where to put images", default
> parser.add_argument("--pid-root", help = "Path to tree's FS root")
> parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks", default = False, action = 'store_true')
>
> +ssh = parser.add_argument_group("ssh tunnel", "CMD to establish ssh tunnel: " + ssh_tunnel.ssh_tunnel_cmd)
> +ssh.add_argument("--ssh-comp", help = "Use compression in the ssh tunnel", default = "", action = "store_const", const = "C")
> +ssh.add_argument("--ssh-user", help = "Remote username", type = str, default = pwd.getpwuid(os.getuid()).pw_name)
> +ssh.add_argument("--ssh-port", help = "Port to connect to ssh on the remote host", default = ssh_tunnel.def_ssh_port, type = int)
> +ssh.add_argument("--ssh-loc-port", help = "Local port to use for forwarding", default = ssh_tunnel.def_loc_port, type = int)
> +ssh.add_argument("--ssh-rem-port", help = "Remote port to use for forwarding (p.haul-service port)", default = xem_rpc.default_rpc_port, type = int)
> +
> args = vars(parser.parse_args())
>
> ph_type = (args.pop("type"), args.pop("id"))
> -dst = args.pop("to")
> +dst_opts, args = util.pop_dest_opts(args)
>
> # Start the migration
> -worker = ph_iters.phaul_iter_worker(ph_type, dst)
> +worker = ph_iters.phaul_iter_worker(ph_type, dst_opts)
> worker.set_options(args)
> worker.start_migration()
> diff --git a/p.haul-service b/p.haul-service
> index 7444df5..46d917a 100755
> --- a/p.haul-service
> +++ b/p.haul-service
> @@ -3,15 +3,21 @@
> import signal
> import xem_rpc
> import p_haul_service as ph_srv
> +import argparse
>
> if __name__ == "__main__":
> + parser = argparse.ArgumentParser("Process HAULer service server")
> + parser.add_argument("-p", "--port", help = "port to bind on", default = xem_rpc.default_rpc_port, type = int)
> +
> + args = vars(parser.parse_args())
> +
> sfd = None
> def fin(foo, bar):
> print "Stop by %d" % foo
> sfd.close()
>
> print "Starting p.haul rpyc service"
> - t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service)
> + t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, args["port"])
>
> # FIXME: Setup stop handlers
> sfd = t.get_stop_fd()
> diff --git a/p_haul_iters.py b/p_haul_iters.py
> index 0463e78..10dfb3f 100644
> --- a/p_haul_iters.py
> +++ b/p_haul_iters.py
> @@ -21,13 +21,13 @@ phaul_iter_min_size = 64
> phaul_iter_grow_max = 10
>
> class phaul_iter_worker:
> - def __init__(self, p_type, host):
> + def __init__(self, p_type, dst_opts):
> self._mstat = mstats.migration_stats()
> self.iteration = 0
> self.prev_stats = None
>
> print "Connecting to target host"
> - self.th = xem_rpc.rpc_proxy(host)
> + self.th = xem_rpc.rpc_proxy(dst_opts)
> self.data_sk = self.th.open_socket("datask")
>
> print "Setting up local"
> @@ -42,7 +42,7 @@ class phaul_iter_worker:
> raise Exception("No FS driver found")
>
> self.pid = self.htype.root_task_pid()
> - self.fs.set_target_host(host)
> + self.fs.set_target_host(dst_opts["to"])
>
> print "Setting up remote"
> self.th.setup(p_type)
> diff --git a/ssh_tunnel.py b/ssh_tunnel.py
> new file mode 100644
> index 0000000..8ea8360
> --- /dev/null
> +++ b/ssh_tunnel.py
> @@ -0,0 +1,26 @@
> +import subprocess
> +import atexit
> +
> +def_ssh_port = 22
> +def_loc_port = 54321
> +
> +ssh_tunnel_cmd = "ssh -N{ssh_comp} -L {ssh_loc_port}:localhost:{ssh_rem_port} -p {ssh_port} {ssh_user}@{to}"
> +
> +class Tunnel:
> + def __init__(self, dst_opts):
> + self._ssh = None
> + self._opts = dst_opts
> + atexit.register(self.stop)
> +
> + def get_local_dst(self):
> + return ("127.0.0.1", self._opts["ssh_loc_port"])
> +
> + def start(self):
> + cmd = ssh_tunnel_cmd.format(**self._opts)
> + self._ssh = subprocess.Popen(cmd.split())
> + print("SSH tunnel started")
> +
> + def stop(self):
> + if self._ssh:
> + self._ssh.terminate()
> + print ("SSH tunnel stopped")
> diff --git a/util.py b/util.py
> index 9883bef..a0bcca4 100644
> --- a/util.py
> +++ b/util.py
> @@ -1,6 +1,7 @@
> import os
> import fcntl
> import errno
> +import xem_rpc
>
> class net_dev:
> def init(self):
> @@ -44,3 +45,12 @@ def makedirs(dirpath):
> pass
> else:
> raise
> +
> +def pop_dest_opts(opts):
> + dest_opts = {}
> + dest_opts["to"] = opts.pop("to")
> + for k in opts.keys():
> + if k.startswith("ssh"):
> + dest_opts[k] = opts.pop(k)
> +
> + return dest_opts, opts
> diff --git a/xem_rpc.py b/xem_rpc.py
> index f1d2c25..94d7798 100644
> --- a/xem_rpc.py
> +++ b/xem_rpc.py
> @@ -4,8 +4,10 @@ import threading
> import traceback
> import util
> import struct
> +import ssh_tunnel
> +import errno
>
> -rpc_port = 12345
> +default_rpc_port = 12345
> rpc_sk_buf = 256
>
> RPC_CMD = 1
> @@ -14,6 +16,7 @@ RPC_CALL = 2
> RPC_RESP = 1
> RPC_EXC = 2
>
> +CONNECT_ATTEMPTS = 2000
> #
> # Client
> #
> @@ -40,8 +43,11 @@ class _rpc_proxy_caller:
> raise Exception("Proto resp error")
>
> class rpc_proxy:
> - def __init__(self, conn, *args):
> - self._srv = conn
> + def __init__(self, conn_opts, *args):
> + self._ssh = ssh_tunnel.Tunnel(conn_opts)
Still need an ability to do ssh-less migration ;)
> + self._srv = self._ssh.get_local_dst()
> + self._ssh.start()
> +
> self._rpc_sk = self._make_sk()[0]
> util.set_cloexec(self._rpc_sk)
> _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
> @@ -51,7 +57,17 @@ class rpc_proxy:
>
> def _make_sk(self):
> sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> - sk.connect((self._srv, rpc_port))
> + # ssh tunnel needs some time to start working, so lets
> + # make CONNECT_ATTEMPTS attempts to connect.
I don't like it. Can you shed more light on what's going on here?
> + for n in xrange(CONNECT_ATTEMPTS):
> + try:
> + sk.connect(self._srv)
> + except socket.error as e:
> + if e.errno != errno.ECONNREFUSED or n == CONNECT_ATTEMPTS - 1:
> + raise e
> + else:
> + continue
> + break
> host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
> return (sk, host)
>
> @@ -62,7 +78,6 @@ class rpc_proxy:
> return sk
>
>
> -
> #
> # Server
> #
> @@ -121,9 +136,9 @@ class _rpc_server_sk:
> self._master.on_socket_open(sk._sk, uname)
>
> class _rpc_server_ask:
> - def __init__(self):
> + def __init__(self, port):
This should go as separate patch.
> sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> - sk.bind(("0.0.0.0", rpc_port))
> + sk.bind(("127.0.0.1", port))
> sk.listen(8)
> self._sk = sk
> util.set_cloexec(self)
> @@ -146,10 +161,10 @@ class _rpc_stop_fd:
> mgr.stop()
>
> class _rpc_server_manager:
> - def __init__(self, srv_class):
> + def __init__(self, srv_class, port):
> self._srv_class = srv_class
> self._sk_by_name = {}
> - self._poll_list = [_rpc_server_ask()]
> + self._poll_list = [_rpc_server_ask(port)]
> self._alive = True
>
> def add(self, sk):
> @@ -184,9 +199,9 @@ class _rpc_server_manager:
> print "RPC Service stops"
>
> class rpc_threaded_srv(threading.Thread):
> - def __init__(self, srv_class):
> + def __init__(self, srv_class, port):
> threading.Thread.__init__(self)
> - self._mgr = _rpc_server_manager(srv_class)
> + self._mgr = _rpc_server_manager(srv_class, port)
> self._stop_fd = None
>
> def run(self):
>
More information about the CRIU
mailing list