[CRIU] [PATCH 2/2] p.haul: use ssh tunneling and controll it with ssh* cmdline opts
Ruslan Kuprieiev
kupruser at gmail.com
Thu Oct 23 11:57:33 PDT 2014
Currently we have such security holes:
1) All p.haul traffic goes without any encryption, which is
not safe at all, cosidering that attacker can easily peek memory pages
of migrating process. Lets solve that by using ssh tunnel which allows
us to easily encrypt and compress traffic.
Compressing is useful only when connection is very slow, but will only
slow down things on fast networks.
Using ssh tunnel also allows us to solve keys\certificates management
problem in a very common way that is familiar to any system administrator.
2) p.haul-service binds to 0.0.0.0 and is accesible from the outside
for anyone who is trying to connect to it. So attacket can easily connect
to p.haul-service and migrate some malicious process to the server.
Lets fix that by binding p.haul-service to 127.0.0.1 so it is accessible
only from its localhost.
So, basically, we perform following actions when migrating process
from src to dest:
1. (user at dest) Start p.haul-service on localhost:12345
2. (user at src) Create ssh tunnel:
ssh -NC 54321:localhost:12345 remote_ip
3. (user at src) Connect p.haul to localhost:54321 and migrate
Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
---
p.haul | 16 ++++++++++++++--
p.haul-service | 8 +++++++-
p_haul_iters.py | 6 +++---
ssh_tunnel.py | 26 ++++++++++++++++++++++++++
util.py | 10 ++++++++++
xem_rpc.py | 37 ++++++++++++++++++++++++++-----------
6 files changed, 86 insertions(+), 17 deletions(-)
create mode 100644 ssh_tunnel.py
diff --git a/p.haul b/p.haul
index b8f050e..f71a59d 100755
--- a/p.haul
+++ b/p.haul
@@ -4,6 +4,11 @@ import argparse
import p_haul_iters as ph_iters
import images
import criu_api
+import xem_rpc
+import ssh_tunnel
+import util
+import os
+import pwd
# Usage idea
# p.haul <type> <id> <destination>
@@ -28,12 +33,19 @@ parser.add_argument("--img-path", help = "Dirctory where to put images", default
parser.add_argument("--pid-root", help = "Path to tree's FS root")
parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks", default = False, action = 'store_true')
+ssh = parser.add_argument_group("ssh tunnel", "CMD to establish ssh tunnel: " + ssh_tunnel.ssh_tunnel_cmd)
+ssh.add_argument("--ssh-comp", help = "Use compression in the ssh tunnel", default = "", action = "store_const", const = "C")
+ssh.add_argument("--ssh-user", help = "Remote username", type = str, default = pwd.getpwuid(os.getuid()).pw_name)
+ssh.add_argument("--ssh-port", help = "Port to connect to ssh on the remote host", default = ssh_tunnel.def_ssh_port, type = int)
+ssh.add_argument("--ssh-loc-port", help = "Local port to use for forwarding", default = ssh_tunnel.def_loc_port, type = int)
+ssh.add_argument("--ssh-rem-port", help = "Remote port to use for forwarding (p.haul-service port)", default = xem_rpc.default_rpc_port, type = int)
+
args = vars(parser.parse_args())
ph_type = (args.pop("type"), args.pop("id"))
-dst = args.pop("to")
+dst_opts, args = util.pop_dest_opts(args)
# Start the migration
-worker = ph_iters.phaul_iter_worker(ph_type, dst)
+worker = ph_iters.phaul_iter_worker(ph_type, dst_opts)
worker.set_options(args)
worker.start_migration()
diff --git a/p.haul-service b/p.haul-service
index 7444df5..46d917a 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -3,15 +3,21 @@
import signal
import xem_rpc
import p_haul_service as ph_srv
+import argparse
if __name__ == "__main__":
+ parser = argparse.ArgumentParser("Process HAULer service server")
+ parser.add_argument("-p", "--port", help = "port to bind on", default = xem_rpc.default_rpc_port, type = int)
+
+ args = vars(parser.parse_args())
+
sfd = None
def fin(foo, bar):
print "Stop by %d" % foo
sfd.close()
print "Starting p.haul rpyc service"
- t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service)
+ t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, args["port"])
# FIXME: Setup stop handlers
sfd = t.get_stop_fd()
diff --git a/p_haul_iters.py b/p_haul_iters.py
index 0463e78..10dfb3f 100644
--- a/p_haul_iters.py
+++ b/p_haul_iters.py
@@ -21,13 +21,13 @@ phaul_iter_min_size = 64
phaul_iter_grow_max = 10
class phaul_iter_worker:
- def __init__(self, p_type, host):
+ def __init__(self, p_type, dst_opts):
self._mstat = mstats.migration_stats()
self.iteration = 0
self.prev_stats = None
print "Connecting to target host"
- self.th = xem_rpc.rpc_proxy(host)
+ self.th = xem_rpc.rpc_proxy(dst_opts)
self.data_sk = self.th.open_socket("datask")
print "Setting up local"
@@ -42,7 +42,7 @@ class phaul_iter_worker:
raise Exception("No FS driver found")
self.pid = self.htype.root_task_pid()
- self.fs.set_target_host(host)
+ self.fs.set_target_host(dst_opts["to"])
print "Setting up remote"
self.th.setup(p_type)
diff --git a/ssh_tunnel.py b/ssh_tunnel.py
new file mode 100644
index 0000000..8ea8360
--- /dev/null
+++ b/ssh_tunnel.py
@@ -0,0 +1,26 @@
+import subprocess
+import atexit
+
+def_ssh_port = 22
+def_loc_port = 54321
+
+ssh_tunnel_cmd = "ssh -N{ssh_comp} -L {ssh_loc_port}:localhost:{ssh_rem_port} -p {ssh_port} {ssh_user}@{to}"
+
+class Tunnel:
+ def __init__(self, dst_opts):
+ self._ssh = None
+ self._opts = dst_opts
+ atexit.register(self.stop)
+
+ def get_local_dst(self):
+ return ("127.0.0.1", self._opts["ssh_loc_port"])
+
+ def start(self):
+ cmd = ssh_tunnel_cmd.format(**self._opts)
+ self._ssh = subprocess.Popen(cmd.split())
+ print("SSH tunnel started")
+
+ def stop(self):
+ if self._ssh:
+ self._ssh.terminate()
+ print ("SSH tunnel stopped")
diff --git a/util.py b/util.py
index 9883bef..a0bcca4 100644
--- a/util.py
+++ b/util.py
@@ -1,6 +1,7 @@
import os
import fcntl
import errno
+import xem_rpc
class net_dev:
def init(self):
@@ -44,3 +45,12 @@ def makedirs(dirpath):
pass
else:
raise
+
+def pop_dest_opts(opts):
+ dest_opts = {}
+ dest_opts["to"] = opts.pop("to")
+ for k in opts.keys():
+ if k.startswith("ssh"):
+ dest_opts[k] = opts.pop(k)
+
+ return dest_opts, opts
diff --git a/xem_rpc.py b/xem_rpc.py
index f4a9b21..669c5cb 100644
--- a/xem_rpc.py
+++ b/xem_rpc.py
@@ -4,8 +4,10 @@ import threading
import traceback
import util
import struct
+import ssh_tunnel
+import errno
-rpc_port = 12345
+default_rpc_port = 12345
rpc_sk_buf = 256
RPC_CMD = 1
@@ -14,6 +16,7 @@ RPC_CALL = 2
RPC_RESP = 1
RPC_EXC = 2
+CONNECT_ATTEMPTS = 2000
#
# Client
#
@@ -40,8 +43,11 @@ class _rpc_proxy_caller:
raise Exception("Proto resp error")
class rpc_proxy:
- def __init__(self, conn, *args):
- self._srv = conn
+ def __init__(self, conn_opts, *args):
+ self._ssh = ssh_tunnel.Tunnel(conn_opts)
+ self._srv = self._ssh.get_local_dst()
+ self._ssh.start()
+
self._rpc_sk = self._make_sk()[0]
util.set_cloexec(self._rpc_sk)
_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
@@ -51,7 +57,17 @@ class rpc_proxy:
def _make_sk(self):
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.connect((self._srv, rpc_port))
+ # ssh tunnel needs some time to start working, so lets
+ # make CONNECT_ATTEMPTS attempts to connect.
+ for n in xrange(CONNECT_ATTEMPTS):
+ try:
+ sk.connect(self._srv)
+ except socket.error as e:
+ if e.errno != errno.ECONNREFUSED or n == CONNECT_ATTEMPTS - 1:
+ raise e
+ else:
+ continue
+ break
host = eval(sk.recv(rpc_sk_buf))
return (sk, host)
@@ -62,7 +78,6 @@ class rpc_proxy:
return sk
-
#
# Server
#
@@ -118,9 +133,9 @@ class _rpc_server_sk:
self._master.on_socket_open(sk._sk, uname)
class _rpc_server_ask:
- def __init__(self):
+ def __init__(self, port):
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.bind(("0.0.0.0", rpc_port))
+ sk.bind(("127.0.0.1", port))
sk.listen(8)
self._sk = sk
util.set_cloexec(self)
@@ -144,10 +159,10 @@ class _rpc_stop_fd:
mgr.stop()
class _rpc_server_manager:
- def __init__(self, srv_class):
+ def __init__(self, srv_class, port):
self._srv_class = srv_class
self._sk_by_name = {}
- self._poll_list = [_rpc_server_ask()]
+ self._poll_list = [_rpc_server_ask(port)]
self._alive = True
def add(self, sk):
@@ -182,9 +197,9 @@ class _rpc_server_manager:
print "RPC Service stops"
class rpc_threaded_srv(threading.Thread):
- def __init__(self, srv_class):
+ def __init__(self, srv_class, port):
threading.Thread.__init__(self)
- self._mgr = _rpc_server_manager(srv_class)
+ self._mgr = _rpc_server_manager(srv_class, port)
self._stop_fd = None
def run(self):
--
1.9.3
More information about the CRIU
mailing list