[CRIU] [PATCH 1/4] p.haul: allow IP:PORT dest on p.haul and add --port to p.haul-service
Ruslan Kuprieiev
kupruser at gmail.com
Mon Oct 13 03:12:44 PDT 2014
This patch allows us to specify port to use on both p.haul and p.haul-service.
Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
---
fs_haul_subtree.py | 4 +++-
p.haul | 16 ++++++++++++++--
p.haul-service | 8 +++++++-
p_haul_iters.py | 6 +++---
xem_rpc.py | 20 ++++++++++----------
5 files changed, 37 insertions(+), 17 deletions(-)
diff --git a/fs_haul_subtree.py b/fs_haul_subtree.py
index 3181c79..70babbc 100644
--- a/fs_haul_subtree.py
+++ b/fs_haul_subtree.py
@@ -23,7 +23,9 @@ class p_haul_fs:
def __run_rsync(self):
logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")
- dst = "%s:%s" % (self.__thost, os.path.dirname(self.__root))
+ #FIXME using __thost[0] here because __thost is (ip,port) tuple.
+ # Maybe need to pass port to rsync too?
+ dst = "%s:%s" % (self.__thost[0], os.path.dirname(self.__root))
# First rsync might be very long. Wait for it not
# to produce big pause between the 1st pre-dump and
diff --git a/p.haul b/p.haul
index b8f050e..129bc4f 100755
--- a/p.haul
+++ b/p.haul
@@ -4,6 +4,7 @@ import argparse
import p_haul_iters as ph_iters
import images
import criu_api
+import xem_rpc
# Usage idea
# p.haul <type> <id> <destination>
@@ -17,10 +18,21 @@ import criu_api
# # p.haul lxc myct 10.0.0.2
#
+def parse_dest(dest):
+ # parse string IP:PORT into tuple (ip,port)
+ spl = dest.split(":", 1)
+ if len(spl) == 1:
+ ip = spl[0]
+ port = xem_rpc.default_rpc_port
+ else:
+ ip = spl[0]
+ port = int(spl[1])
+ return (ip, port)
+
parser = argparse.ArgumentParser("Process HAULer")
parser.add_argument("type", help = "Type of hat to haul, e.g. ovz")
parser.add_argument("id", help = "ID of what to haul")
-parser.add_argument("to", help = "IP where to haul")
+parser.add_argument("to", help = "IP:PORT where to haul")
parser.add_argument("-v", help = "Verbosity level", default = criu_api.def_verb, type = int, dest = "verbose")
parser.add_argument("--keep-images", help = "Keep images after migration", default = False, action = 'store_true')
parser.add_argument("--dst-rpid", help = "Write pidfile on restore", default = None)
@@ -31,7 +43,7 @@ parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks",
args = vars(parser.parse_args())
ph_type = (args.pop("type"), args.pop("id"))
-dst = args.pop("to")
+dst = parse_dest(args.pop("to"))
# Start the migration
worker = ph_iters.phaul_iter_worker(ph_type, dst)
diff --git a/p.haul-service b/p.haul-service
index 7444df5..46d917a 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -3,15 +3,21 @@
import signal
import xem_rpc
import p_haul_service as ph_srv
+import argparse
if __name__ == "__main__":
+ parser = argparse.ArgumentParser("Process HAULer service server")
+ parser.add_argument("-p", "--port", help = "port to bind on", default = xem_rpc.default_rpc_port, type = int)
+
+ args = vars(parser.parse_args())
+
sfd = None
def fin(foo, bar):
print "Stop by %d" % foo
sfd.close()
print "Starting p.haul rpyc service"
- t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service)
+ t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, args["port"])
# FIXME: Setup stop handlers
sfd = t.get_stop_fd()
diff --git a/p_haul_iters.py b/p_haul_iters.py
index 0463e78..0717248 100644
--- a/p_haul_iters.py
+++ b/p_haul_iters.py
@@ -21,13 +21,13 @@ phaul_iter_min_size = 64
phaul_iter_grow_max = 10
class phaul_iter_worker:
- def __init__(self, p_type, host):
+ def __init__(self, p_type, dst):
self._mstat = mstats.migration_stats()
self.iteration = 0
self.prev_stats = None
print "Connecting to target host"
- self.th = xem_rpc.rpc_proxy(host)
+ self.th = xem_rpc.rpc_proxy(dst)
self.data_sk = self.th.open_socket("datask")
print "Setting up local"
@@ -42,7 +42,7 @@ class phaul_iter_worker:
raise Exception("No FS driver found")
self.pid = self.htype.root_task_pid()
- self.fs.set_target_host(host)
+ self.fs.set_target_host(dst)
print "Setting up remote"
self.th.setup(p_type)
diff --git a/xem_rpc.py b/xem_rpc.py
index 9718eef..6cb5203 100644
--- a/xem_rpc.py
+++ b/xem_rpc.py
@@ -4,7 +4,7 @@ import threading
import traceback
import util
-rpc_port = 12345
+default_rpc_port = 12345
rpc_sk_buf = 256
RPC_CMD = 1
@@ -39,8 +39,8 @@ class _rpc_proxy_caller:
raise Exception("Proto resp error")
class rpc_proxy:
- def __init__(self, conn, *args):
- self._srv = conn
+ def __init__(self, dst, *args):
+ self._srv = dst
self._rpc_sk = self._make_sk()
util.set_cloexec(self._rpc_sk)
_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
@@ -50,7 +50,7 @@ class rpc_proxy:
def _make_sk(self):
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.connect((self._srv, rpc_port))
+ sk.connect(self._srv)
return sk
def open_socket(self, uname):
@@ -116,9 +116,9 @@ class _rpc_server_sk:
self._master.on_socket_open(sk._sk, uname)
class _rpc_server_ask:
- def __init__(self):
+ def __init__(self, port):
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.bind(("0.0.0.0", rpc_port))
+ sk.bind(("0.0.0.0", port))
sk.listen(8)
self._sk = sk
util.set_cloexec(self)
@@ -141,10 +141,10 @@ class _rpc_stop_fd:
mgr.stop()
class _rpc_server_manager:
- def __init__(self, srv_class):
+ def __init__(self, srv_class, port):
self._srv_class = srv_class
self._sk_by_name = {}
- self._poll_list = [_rpc_server_ask()]
+ self._poll_list = [_rpc_server_ask(port)]
self._alive = True
def add(self, sk):
@@ -179,9 +179,9 @@ class _rpc_server_manager:
print "RPC Service stops"
class rpc_threaded_srv(threading.Thread):
- def __init__(self, srv_class):
+ def __init__(self, srv_class, port):
threading.Thread.__init__(self)
- self._mgr = _rpc_server_manager(srv_class)
+ self._mgr = _rpc_server_manager(srv_class, port)
self._stop_fd = None
def run(self):
--
1.9.3
More information about the CRIU
mailing list