[CRIU] [PATCH 1/4] p.haul: allow IP:PORT dest on p.haul and add --port to p.haul-service

Ruslan Kuprieiev kupruser at gmail.com
Mon Oct 13 03:12:44 PDT 2014


This patch allows us to specify port to use on both p.haul and p.haul-service.

Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
---
 fs_haul_subtree.py |  4 +++-
 p.haul             | 16 ++++++++++++++--
 p.haul-service     |  8 +++++++-
 p_haul_iters.py    |  6 +++---
 xem_rpc.py         | 20 ++++++++++----------
 5 files changed, 37 insertions(+), 17 deletions(-)

diff --git a/fs_haul_subtree.py b/fs_haul_subtree.py
index 3181c79..70babbc 100644
--- a/fs_haul_subtree.py
+++ b/fs_haul_subtree.py
@@ -23,7 +23,9 @@ class p_haul_fs:
 
 	def __run_rsync(self):
 		logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")
-		dst = "%s:%s" % (self.__thost, os.path.dirname(self.__root))
+		#FIXME using __thost[0] here because __thost is (ip,port) tuple.
+		# Maybe need to pass port to rsync too?
+		dst = "%s:%s" % (self.__thost[0], os.path.dirname(self.__root))
 
 		# First rsync might be very long. Wait for it not
 		# to produce big pause between the 1st pre-dump and
diff --git a/p.haul b/p.haul
index b8f050e..129bc4f 100755
--- a/p.haul
+++ b/p.haul
@@ -4,6 +4,7 @@ import argparse
 import p_haul_iters as ph_iters
 import images
 import criu_api
+import xem_rpc
 
 # Usage idea
 # p.haul <type> <id> <destination>
@@ -17,10 +18,21 @@ import criu_api
 # # p.haul lxc myct 10.0.0.2
 #
 
+def parse_dest(dest):
+	# parse string IP:PORT into tuple (ip,port)
+	spl = dest.split(":", 1)
+	if len(spl) == 1:
+		ip = spl[0]
+		port = xem_rpc.default_rpc_port
+	else:
+		ip = spl[0]
+		port = int(spl[1])
+	return (ip, port)
+
 parser = argparse.ArgumentParser("Process HAULer")
 parser.add_argument("type", help = "Type of hat to haul, e.g. ovz")
 parser.add_argument("id", help = "ID of what to haul")
-parser.add_argument("to", help = "IP where to haul")
+parser.add_argument("to", help = "IP:PORT where to haul")
 parser.add_argument("-v", help = "Verbosity level", default = criu_api.def_verb, type = int, dest = "verbose")
 parser.add_argument("--keep-images", help = "Keep images after migration", default = False, action = 'store_true')
 parser.add_argument("--dst-rpid", help = "Write pidfile on restore", default = None)
@@ -31,7 +43,7 @@ parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks",
 args = vars(parser.parse_args())
 
 ph_type = (args.pop("type"), args.pop("id"))
-dst = args.pop("to")
+dst = parse_dest(args.pop("to"))
 
 # Start the migration
 worker = ph_iters.phaul_iter_worker(ph_type, dst)
diff --git a/p.haul-service b/p.haul-service
index 7444df5..46d917a 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -3,15 +3,21 @@
 import signal
 import xem_rpc
 import p_haul_service as ph_srv
+import argparse
 
 if __name__ == "__main__":
+	parser = argparse.ArgumentParser("Process HAULer service server")
+	parser.add_argument("-p", "--port", help = "port to bind on", default = xem_rpc.default_rpc_port, type = int)
+
+	args = vars(parser.parse_args())
+
 	sfd = None
 	def fin(foo, bar):
 		print "Stop by %d" % foo
 		sfd.close()
 
 	print "Starting p.haul rpyc service"
-	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service)
+	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, args["port"])
 
 	# FIXME: Setup stop handlers
 	sfd = t.get_stop_fd()
diff --git a/p_haul_iters.py b/p_haul_iters.py
index 0463e78..0717248 100644
--- a/p_haul_iters.py
+++ b/p_haul_iters.py
@@ -21,13 +21,13 @@ phaul_iter_min_size = 64
 phaul_iter_grow_max = 10
 
 class phaul_iter_worker:
-	def __init__(self, p_type, host):
+	def __init__(self, p_type, dst):
 		self._mstat = mstats.migration_stats()
 		self.iteration = 0
 		self.prev_stats = None
 
 		print "Connecting to target host"
-		self.th = xem_rpc.rpc_proxy(host)
+		self.th = xem_rpc.rpc_proxy(dst)
 		self.data_sk = self.th.open_socket("datask")
 
 		print "Setting up local"
@@ -42,7 +42,7 @@ class phaul_iter_worker:
 			raise Exception("No FS driver found")
 
 		self.pid = self.htype.root_task_pid()
-		self.fs.set_target_host(host)
+		self.fs.set_target_host(dst)
 
 		print "Setting up remote"
 		self.th.setup(p_type)
diff --git a/xem_rpc.py b/xem_rpc.py
index 9718eef..6cb5203 100644
--- a/xem_rpc.py
+++ b/xem_rpc.py
@@ -4,7 +4,7 @@ import threading
 import traceback
 import util
 
-rpc_port = 12345
+default_rpc_port = 12345
 rpc_sk_buf = 256
 
 RPC_CMD = 1
@@ -39,8 +39,8 @@ class _rpc_proxy_caller:
 			raise Exception("Proto resp error")
 
 class rpc_proxy:
-	def __init__(self, conn, *args):
-		self._srv = conn
+	def __init__(self, dst, *args):
+		self._srv = dst
 		self._rpc_sk = self._make_sk()
 		util.set_cloexec(self._rpc_sk)
 		_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
@@ -50,7 +50,7 @@ class rpc_proxy:
 
 	def _make_sk(self):
 		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.connect((self._srv, rpc_port))
+		sk.connect(self._srv)
 		return sk
 
 	def open_socket(self, uname):
@@ -116,9 +116,9 @@ class _rpc_server_sk:
 			self._master.on_socket_open(sk._sk, uname)
 
 class _rpc_server_ask:
-	def __init__(self):
+	def __init__(self, port):
 		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.bind(("0.0.0.0", rpc_port))
+		sk.bind(("0.0.0.0", port))
 		sk.listen(8)
 		self._sk = sk
 		util.set_cloexec(self)
@@ -141,10 +141,10 @@ class _rpc_stop_fd:
 		mgr.stop()
 
 class _rpc_server_manager:
-	def __init__(self, srv_class):
+	def __init__(self, srv_class, port):
 		self._srv_class = srv_class
 		self._sk_by_name = {}
-		self._poll_list = [_rpc_server_ask()]
+		self._poll_list = [_rpc_server_ask(port)]
 		self._alive = True
 
 	def add(self, sk):
@@ -179,9 +179,9 @@ class _rpc_server_manager:
 		print "RPC Service stops"
 
 class rpc_threaded_srv(threading.Thread):
-	def __init__(self, srv_class):
+	def __init__(self, srv_class, port):
 		threading.Thread.__init__(self)
-		self._mgr = _rpc_server_manager(srv_class)
+		self._mgr = _rpc_server_manager(srv_class, port)
 		self._stop_fd = None
 
 	def run(self):
-- 
1.9.3



More information about the CRIU mailing list