[CRIU] [PATCH 4/7] p.haul: implement migration over existing connections

Nikita Spiridonov nspiridonov at odin.com
Tue Oct 6 08:28:32 PDT 2015


Remove standalone mode, p.haul now can work only over existing
connections specified via command line arguments as file
descriptors.

Three arguments required - --fdrpc for rpc calls, --fdmem for c/r
images migration and --fdfs for disk migration. Expect that each
file descriptor represent socket opened in blocking mode with domain
AF_INET and type SOCK_STREAM.

--to option preserved for now in p.haul for disk migration via
rsync, it will be unused soon at least for vz module which will
use ploop for disk migration.

Signed-off-by: Nikita Spiridonov <nspiridonov at odin.com>
---
 p.haul                     |   51 +++++++++++++++++++------------
 p.haul-service             |   31 ++++++++++++-------
 phaul/fs_haul_shared.py    |    2 +-
 phaul/fs_haul_subtree.py   |    5 ++-
 phaul/p_haul_connection.py |   44 +++++++++++++++++++++++++++
 phaul/p_haul_iters.py      |   16 +++++-----
 phaul/p_haul_service.py    |   29 ++++++++----------
 phaul/util.py              |    1 -
 phaul/xem_rpc.py           |   71 +++++++++++--------------------------------
 phaul/xem_rpc_client.py    |   18 +----------
 10 files changed, 140 insertions(+), 128 deletions(-)
 create mode 100644 phaul/p_haul_connection.py

diff --git a/p.haul b/p.haul
index aea7971..4d3877c 100755
--- a/p.haul
+++ b/p.haul
@@ -3,34 +3,39 @@
 import sys
 import argparse
 import logging
-import phaul.p_haul_iters as ph_iters
-import phaul.images as ph_images
-import phaul.criu_api as ph_criu_api
-import phaul.xem_rpc as ph_xem_rpc
+import phaul.p_haul_iters
+import phaul.p_haul_connection
+import phaul.images
+import phaul.criu_api
+import phaul.xem_rpc
 
 # Usage idea
-# p.haul <type> <id> <destination>
+# p.haul <type> <id> --fdrpc <fd> --fdmem <fd> --fdfs <fd>
 #
-# E.g.
-#
-# # p.haul vz 100 10.0.0.1
-#
-# or
+# p.haul work over existing connections specified via command line arguments
+# as file descriptors. Three arguments required - --fdrpc for rpc calls,
+# --fdmem for c/r images migration and --fdfs for disk migration. For testing
+# purposed p.haul-wrap helper script can be used which establish required
+# connections with target host and call p.haul or p.haul-service.
 #
-# # p.haul lxc myct 10.0.0.2
+# E.g.
+# p.haul vz 100 --fdrpc 3 --fdmem 4 --fdfs 5
+# p.haul lxc myct --fdrpc 3 --fdmem 4 --fdfs 5
 #
 
 parser = argparse.ArgumentParser("Process HAULer")
 parser.add_argument("type", help="Type of hat to haul, e.g. vz or lxc")
 parser.add_argument("id", help="ID of what to haul")
-parser.add_argument("to", help="IP where to haul")
-parser.add_argument("-v", help="Verbosity level", default=ph_criu_api.def_verb, type=int, dest="verbose")
+parser.add_argument("--to", help="IP where to haul")
+parser.add_argument("--fdrpc", help="File descriptor of rpc socket", type=int, required=True)
+parser.add_argument("--fdmem", help="File descriptor of memory socket", type=int, required=True)
+parser.add_argument("--fdfs", help="File descriptor of fs socket", type=int, required=True)
+parser.add_argument("-v", help="Verbosity level", default=phaul.criu_api.def_verb, type=int, dest="verbose")
 parser.add_argument("--keep-images", help="Keep images after migration", default=False, action='store_true')
 parser.add_argument("--dst-rpid", help="Write pidfile on restore", default=None)
-parser.add_argument("--img-path", help="Dirctory where to put images", default=ph_images.def_path)
+parser.add_argument("--img-path", help="Dirctory where to put images", default=phaul.images.def_path)
 parser.add_argument("--pid-root", help="Path to tree's FS root")
 parser.add_argument("--force", help="Don't do any sanity (CPU compat) checks", default=False, action='store_true')
-parser.add_argument("--port", help="Port where to haul", type=int, default=ph_xem_rpc.rpc_port)
 parser.add_argument("--log-file", help="Write logging messages to specified file")
 
 args = parser.parse_args()
@@ -44,11 +49,17 @@ def log_uncaught_exception(type, value, traceback):
 	logging.error(value, exc_info=(type, value, traceback))
 sys.excepthook = log_uncaught_exception
 
-args_dict = vars(args)
-ph_type = (args_dict.pop("type"), args_dict.pop("id"))
-dst = (args_dict.pop("to"), args_dict.pop("port"))
+logging.info("Starting p.haul")
+
+# Establish connection
+connection = phaul.p_haul_connection.establish(args.fdrpc, args.fdmem,
+	args.fdfs)
 
 # Start the migration
-worker = ph_iters.phaul_iter_worker(ph_type, dst)
-worker.set_options(args_dict)
+ph_type = args.type, args.id
+worker = phaul.p_haul_iters.phaul_iter_worker(ph_type, connection)
+worker.set_options(vars(args))
 worker.start_migration()
+
+# Close connection
+connection.close()
diff --git a/p.haul-service b/p.haul-service
index fbaa7ff..858b0ff 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -4,12 +4,14 @@ import sys
 import signal
 import argparse
 import logging
-import phaul.xem_rpc as ph_xem_rpc
-import phaul.p_haul_service as ph_srv
+import phaul.xem_rpc
+import phaul.p_haul_service
+import phaul.p_haul_connection
 
 parser = argparse.ArgumentParser("Process HAULer service server")
-parser.add_argument("--bind-addr", help="IP to bind to", type=str, default="0.0.0.0")
-parser.add_argument("--bind-port", help="Port to bind to", type=int, default=ph_xem_rpc.rpc_port)
+parser.add_argument("--fdrpc", help="File descriptor of rpc socket", type=int, required=True)
+parser.add_argument("--fdmem", help="File descriptor of memory socket", type=int, required=True)
+parser.add_argument("--fdfs", help="File descriptor of fs socket", type=int, required=True)
 parser.add_argument("--log-file", help="Write logging messages to specified file")
 
 args = parser.parse_args()
@@ -23,18 +25,22 @@ def log_uncaught_exception(type, value, traceback):
 	logging.error(value, exc_info=(type, value, traceback))
 sys.excepthook = log_uncaught_exception
 
-host = (args.bind_addr, args.bind_port)
-
-sfd = None
+stop_fd = None
 def fin(foo, bar):
 	logging.info("Stop by %d", foo)
-	sfd.close()
+	stop_fd.close()
+
+logging.info("Starting p.haul service")
 
-logging.info("Starting p.haul rpyc service")
-t = ph_xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, host)
+# Establish connection
+connection = phaul.p_haul_connection.establish(args.fdrpc, args.fdmem,
+	args.fdfs)
+
+t = phaul.xem_rpc.rpc_threaded_srv(phaul.p_haul_service.phaul_service,
+	connection)
 
 # FIXME: Setup stop handlers
-sfd = t.get_stop_fd()
+stop_fd = t.init_stop_fd()
 signal.signal(signal.SIGTERM, fin)
 signal.signal(signal.SIGINT, fin)
 
@@ -42,3 +48,6 @@ t.start()
 signal.pause()
 t.join()
 logging.info("Bye!")
+
+# Close connection
+connection.close()
diff --git a/phaul/fs_haul_shared.py b/phaul/fs_haul_shared.py
index d6b3a61..2212e0e 100644
--- a/phaul/fs_haul_shared.py
+++ b/phaul/fs_haul_shared.py
@@ -8,7 +8,7 @@ class p_haul_fs:
 	def __init__(self):
 		logging.info("Initilized shared FS hauler")
 
-	def set_target_host(self, thost):
+	def set_options(self, opts):
 		pass
 
 	def set_work_dir(self, wdir):
diff --git a/phaul/fs_haul_subtree.py b/phaul/fs_haul_subtree.py
index 5d90cb9..a9bd559 100644
--- a/phaul/fs_haul_subtree.py
+++ b/phaul/fs_haul_subtree.py
@@ -14,9 +14,10 @@ class p_haul_fs:
 	def __init__(self, subtree_path):
 		logging.info("Initialized subtree FS hauler (%s)", subtree_path)
 		self.__root = subtree_path
+		self.__thost = None
 
-	def set_target_host(self, thost):
-		self.__thost = thost
+	def set_options(self, opts):
+		self.__thost = opts["to"]
 
 	def set_work_dir(self, wdir):
 		self.__wdir = wdir
diff --git a/phaul/p_haul_connection.py b/phaul/p_haul_connection.py
new file mode 100644
index 0000000..48b962a
--- /dev/null
+++ b/phaul/p_haul_connection.py
@@ -0,0 +1,44 @@
+#
+# p.haul connection module contain logic needed to establish connection
+# between p.haul and p.haul-service.
+#
+
+import logging
+import socket
+import util
+
+class connection:
+	"""p.haul connection
+
+	Class encapsulate connections reqired for p.haul work, including rpc socket
+	(socket for RPC calls), memory socket (socket for c/r images migration) and
+	fs socket (socket for disk migration).
+	"""
+
+	def __init__(self, rpc_sk, mem_sk, fs_sk):
+		self.rpc_sk = rpc_sk
+		self.mem_sk = mem_sk
+		self.fs_sk = fs_sk
+
+	def close(self):
+		self.rpc_sk.close()
+		self.mem_sk.close()
+		self.fs_sk.close()
+
+def establish(fdrpc, fdmem, fdfs):
+	"""Construct required socket objects from file descriptors
+
+	Expect that each file descriptor represent socket opened in blocking mode
+	with domain AF_INET and type SOCK_STREAM.
+	"""
+
+	logging.info("Use existing connections, fdrpc=%d fdmem=%d fdfs=%d", fdrpc,
+		fdmem, fdfs)
+
+	rpc_sk = socket.fromfd(fdrpc, socket.AF_INET, socket.SOCK_STREAM)
+	mem_sk = socket.fromfd(fdmem, socket.AF_INET, socket.SOCK_STREAM)
+	fs_sk = socket.fromfd(fdfs, socket.AF_INET, socket.SOCK_STREAM)
+
+	util.set_cloexec(rpc_sk)
+
+	return connection(rpc_sk, mem_sk, fs_sk)
diff --git a/phaul/p_haul_iters.py b/phaul/p_haul_iters.py
index 6050236..0d929ca 100644
--- a/phaul/p_haul_iters.py
+++ b/phaul/p_haul_iters.py
@@ -23,14 +23,14 @@ phaul_iter_min_size = 64
 phaul_iter_grow_max = 10
 
 class phaul_iter_worker:
-	def __init__(self, p_type, host):
-		logging.info("Connecting to target host")
-		self.target_host = xem_rpc_client.rpc_proxy(host)
-		self.data_socket = self.target_host.open_socket("datask")
+	def __init__(self, p_type, connection):
+		self.connection = connection
+		self.target_host = xem_rpc_client.rpc_proxy(self.connection.rpc_sk)
 
 		logging.info("Setting up local")
+		self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
 		self.img = images.phaul_images("dmp")
-		self.criu_connection = criu_api.criu_conn(self.data_socket)
+
 		self.htype = p_haul_type.get_src(p_type)
 		if not self.htype:
 			raise Exception("No htype driver found")
@@ -40,7 +40,6 @@ class phaul_iter_worker:
 			raise Exception("No FS driver found")
 
 		self.pid = self.htype.root_task_pid()
-		self.fs.set_target_host(host[0])
 
 		logging.info("Setting up remote")
 		self.target_host.setup(p_type)
@@ -53,6 +52,7 @@ class phaul_iter_worker:
 		self.criu_connection.verbose(opts["verbose"])
 		self.img.set_options(opts)
 		self.htype.set_options(opts)
+		self.fs.set_options(opts)
 		self.__force = opts["force"]
 
 	def validate_cpu(self):
@@ -65,7 +65,7 @@ class phaul_iter_worker:
 			raise Exception("Can't dump cpuinfo")
 
 		logging.info("\t`- Sending CPU info")
-		self.img.send_cpuinfo(self.target_host, self.data_socket)
+		self.img.send_cpuinfo(self.target_host, self.connection.mem_sk)
 
 		logging.info("\t`- Checking CPU info")
 		if not self.target_host.check_cpuinfo():
@@ -185,7 +185,7 @@ class phaul_iter_worker:
 		logging.info("Final FS and images sync")
 		self.fs.stop_migration()
 		self.img.sync_imgs_to_target(self.target_host, self.htype,
-			self.data_socket)
+			self.connection.mem_sk)
 
 		logging.info("Asking target host to restore")
 		self.target_host.restore_from_images()
diff --git a/phaul/p_haul_service.py b/phaul/p_haul_service.py
index fd3c611..b3d4335 100644
--- a/phaul/p_haul_service.py
+++ b/phaul/p_haul_service.py
@@ -10,23 +10,24 @@ import criu_req
 import p_haul_type
 
 class phaul_service:
-	def on_connect(self):
-		logging.info("Connected")
-		self.dump_iter = 0
-		self.restored = False
+	def __init__(self, mem_sk, fs_sk):
+		self._mem_sk = mem_sk
+		self._fs_sk = fs_sk
+
 		self.criu_connection = None
-		self.data_socket = None
 		self.img = None
 		self.htype = None
+		self.dump_iter_index = 0
+		self.restored = False
+
+	def on_connect(self):
+		logging.info("Connected")
 
 	def on_disconnect(self):
 		logging.info("Disconnected")
 		if self.criu_connection:
 			self.criu_connection.close()
 
-		if self.data_socket:
-			self.data_socket.close()
-
 		if self.htype and not self.restored:
 			self.htype.umount()
 
@@ -36,14 +37,10 @@ class phaul_service:
 				self.img.save_images()
 			self.img.close()
 
-	def on_socket_open(self, sk, uname):
-		self.data_socket = sk
-		logging.info("Data socket (%s) accepted", uname)
-
 	def rpc_setup(self, htype_id):
 		logging.info("Setting up service side %s", htype_id)
 		self.img = images.phaul_images("rst")
-		self.criu_connection = criu_api.criu_conn(self.data_socket)
+		self.criu_connection = criu_api.criu_conn(self._mem_sk)
 		self.htype = p_haul_type.get_dst(htype_id)
 
 	def rpc_set_options(self, opts):
@@ -52,7 +49,7 @@ class phaul_service:
 		self.htype.set_options(opts)
 
 	def start_page_server(self):
-		logging.info("Starting page server for iter %d", self.dump_iter)
+		logging.info("Starting page server for iter %d", self.dump_iter_index)
 
 		logging.info("\tSending criu rpc req")
 		req = criu_req.make_page_server_req(self.htype, self.img,
@@ -64,7 +61,7 @@ class phaul_service:
 		logging.info("\tPage server started at %d", resp.ps.pid)
 
 	def rpc_start_iter(self):
-		self.dump_iter += 1
+		self.dump_iter_index += 1
 		self.img.new_image_dir()
 		self.start_page_server()
 
@@ -72,7 +69,7 @@ class phaul_service:
 		pass
 
 	def rpc_start_accept_images(self, dir_id):
-		self.img.start_accept_images(dir_id, self.data_socket)
+		self.img.start_accept_images(dir_id, self._mem_sk)
 
 	def rpc_stop_accept_images(self):
 		self.img.stop_accept_images()
diff --git a/phaul/util.py b/phaul/util.py
index b15c75f..1bfd6d1 100644
--- a/phaul/util.py
+++ b/phaul/util.py
@@ -33,7 +33,6 @@ def bridge_add(ifname, brname):
 	os.system("brctl addif %s %s" % (brname, ifname))
 
 def set_cloexec(sk):
-	fd = sk.fileno()
 	flags = fcntl.fcntl(sk, fcntl.F_GETFD)
 	fcntl.fcntl(sk, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
 
diff --git a/phaul/xem_rpc.py b/phaul/xem_rpc.py
index 92963d6..597bede 100644
--- a/phaul/xem_rpc.py
+++ b/phaul/xem_rpc.py
@@ -9,7 +9,6 @@ import traceback
 import logging
 import util
 
-rpc_port = 12345
 rpc_sk_buf = 16384
 
 RPC_CMD = 1
@@ -20,25 +19,18 @@ RPC_EXC = 2
 
 class _rpc_server_sk:
 	def __init__(self, sk):
-		self._sk = sk
+		self._rpc_sk = sk
 		self._master = None
 
 	def fileno(self):
-		return self._sk.fileno()
-
-	def hash_name(self):
-		return self._sk.getpeername()
-
-	def get_name(self, mgr):
-		return self.hash_name()
+		return self._rpc_sk.fileno()
 
 	def work(self, mgr):
-		raw_data = self._sk.recv(rpc_sk_buf)
+		raw_data = self._rpc_sk.recv(rpc_sk_buf)
 		if not raw_data:
-			mgr.remove(self)
+			mgr.remove_poll_item(self)
 			if self._master:
 				self._master.on_disconnect()
-			self._sk.close()
 			return
 
 		data = eval(raw_data)
@@ -59,33 +51,12 @@ class _rpc_server_sk:
 			res = (RPC_RESP, res)
 
 		raw_data = repr(res)
-		self._sk.send(raw_data)
+		self._rpc_sk.send(raw_data)
 
 	def init_rpc(self, mgr, args):
-		util.set_cloexec(self)
 		self._master = mgr.make_master()
 		self._master.on_connect(*args)
 
-	def pick_channel(self, mgr, hash_name, uname):
-		sk = mgr.pick_sk(hash_name)
-		if sk:
-			self._master.on_socket_open(sk._sk, uname)
-
-class _rpc_server_ask:
-	def __init__(self, host):
-		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.bind(host)
-		sk.listen(8)
-		self._sk = sk
-		util.set_cloexec(self)
-
-	def fileno(self):
-		return self._sk.fileno()
-
-	def work(self, mgr):
-		sk, addr = self._sk.accept()
-		mgr.add(_rpc_server_sk(sk))
-
 class _rpc_stop_fd:
 	def __init__(self, fd):
 		self._fd = fd
@@ -97,35 +68,29 @@ class _rpc_stop_fd:
 		mgr.stop()
 
 class _rpc_server_manager:
-	def __init__(self, srv_class, host):
+	def __init__(self, srv_class, connection):
 		self._srv_class = srv_class
-		self._sk_by_name = {}
-		self._poll_list = [_rpc_server_ask(host)]
+		self._connection = connection
+		self._poll_list = []
 		self._alive = True
 
-	def add(self, sk):
-		self._sk_by_name[sk.hash_name()] = sk
-		self._poll_list.append(sk)
+		self.add_poll_item(_rpc_server_sk(connection.rpc_sk))
 
-	def remove(self, sk):
-		self._sk_by_name.pop(sk.hash_name())
-		self._poll_list.remove(sk)
+	def add_poll_item(self, item):
+		self._poll_list.append(item)
 
-	def pick_sk(self, hash_name):
-		sk = self._sk_by_name.pop(hash_name, None)
-		if sk:
-			self._poll_list.remove(sk)
-		return sk
+	def remove_poll_item(self, item):
+		self._poll_list.remove(item)
 
 	def make_master(self):
-		return self._srv_class()
+		return self._srv_class(self._connection.mem_sk, self._connection.fs_sk)
 
 	def stop(self):
 		self._alive = False
 
 	def loop(self, stop_fd):
 		if stop_fd:
-			self._poll_list.append(_rpc_stop_fd(stop_fd))
+			self.add_poll_item(_rpc_stop_fd(stop_fd))
 
 		while self._alive:
 			r, w, x = select.select(self._poll_list, [], [])
@@ -135,9 +100,9 @@ class _rpc_server_manager:
 		logging.info("RPC Service stops")
 
 class rpc_threaded_srv(threading.Thread):
-	def __init__(self, srv_class, host):
+	def __init__(self, srv_class, connection):
 		threading.Thread.__init__(self)
-		self._mgr = _rpc_server_manager(srv_class, host)
+		self._mgr = _rpc_server_manager(srv_class, connection)
 		self._stop_fd = None
 
 	def run(self):
@@ -146,7 +111,7 @@ class rpc_threaded_srv(threading.Thread):
 		except:
 			logging.exception("Exception in rpc_threaded_srv")
 
-	def get_stop_fd(self):
+	def init_stop_fd(self):
 		sks = socket.socketpair()
 		self._stop_fd = sks[0]
 		return sks[1]
diff --git a/phaul/xem_rpc_client.py b/phaul/xem_rpc_client.py
index 07246c0..c6fe8c3 100644
--- a/phaul/xem_rpc_client.py
+++ b/phaul/xem_rpc_client.py
@@ -29,24 +29,10 @@ class _rpc_proxy_caller:
 			raise Exception("Proto resp error")
 
 class rpc_proxy:
-	def __init__(self, conn, *args):
-		self._srv = conn
-		self._rpc_sk = self._make_sk()
-		util.set_cloexec(self._rpc_sk)
+	def __init__(self, sk, *args):
+		self._rpc_sk = sk
 		c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "init_rpc")
 		c(args)
 
 	def __getattr__(self, attr):
 		return _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CALL, attr)
-
-	def _make_sk(self):
-		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.connect(self._srv)
-		return sk
-
-	def open_socket(self, uname):
-		sk = self._make_sk()
-		host = _rpc_proxy_caller(sk, xem_rpc.RPC_CMD, "get_name")()
-		c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "pick_channel")
-		c(host, uname)
-		return sk
-- 
1.7.1



More information about the CRIU mailing list