[CRIU] [PATCH 01/10] p.haul: implement migration over existing connections

Nikita Spiridonov nspiridonov at odin.com
Fri Oct 9 10:11:33 PDT 2015


Remove standalone mode, p.haul now can work only over existing
connections specified via command line arguments as file
descriptors.

Three arguments required - --fdrpc for rpc calls, --fdmem for c/r
images migration and --fdfs for disk migration. Expect that each
file descriptor represent socket opened in blocking mode with domain
AF_INET and type SOCK_STREAM.

--to option preserved for now in p.haul for disk migration via
rsync, it will be unused soon at least for vz module which will
use ploop for disk migration.

Signed-off-by: Nikita Spiridonov <nspiridonov at odin.com>
---
 p.haul                     |   40 ++++++++++++++++++-----------
 p.haul-service             |   20 ++++++++++----
 phaul/fs_haul_shared.py    |    2 +-
 phaul/fs_haul_subtree.py   |    5 ++-
 phaul/p_haul_connection.py |   44 ++++++++++++++++++++++++++++++++
 phaul/p_haul_iters.py      |   16 ++++++------
 phaul/p_haul_service.py    |   20 ++++++---------
 phaul/xem_rpc.py           |   59 +++++++++-----------------------------------
 phaul/xem_rpc_client.py    |   18 +------------
 9 files changed, 117 insertions(+), 107 deletions(-)
 create mode 100644 phaul/p_haul_connection.py

diff --git a/p.haul b/p.haul
index e807c8f..5f7b723 100755
--- a/p.haul
+++ b/p.haul
@@ -6,24 +6,29 @@ import logging
 import phaul.p_haul_iters as ph_iters
 import phaul.images as ph_images
 import phaul.criu_api as ph_criu_api
-import phaul.xem_rpc as ph_xem_rpc
+import phaul.p_haul_connection
 
 # Usage idea
-# p.haul <type> <id> <destination>
+# p.haul <type> <id> --fdrpc <fd> --fdmem <fd> --fdfs <fd>
 #
-# E.g.
-#
-# # p.haul vz 100 10.0.0.1
-#
-# or
+# p.haul work over existing connections specified via command line arguments
+# as file descriptors. Three arguments required - --fdrpc for rpc calls,
+# --fdmem for c/r images migration and --fdfs for disk migration. For testing
+# purposed p.haul-wrap helper script can be used which establish required
+# connections with target host and call p.haul or p.haul-service.
 #
-# # p.haul lxc myct 10.0.0.2
+# E.g.
+# p.haul vz 100 --fdrpc 3 --fdmem 4 --fdfs 5
+# p.haul lxc myct --fdrpc 3 --fdmem 4 --fdfs 5
 #
 
 parser = argparse.ArgumentParser("Process HAULer")
 parser.add_argument("type", help="Type of hat to haul, e.g. vz or lxc")
 parser.add_argument("id", help="ID of what to haul")
-parser.add_argument("to", help="IP where to haul")
+parser.add_argument("--to", help="IP where to haul")
+parser.add_argument("--fdrpc", help="File descriptor of rpc socket", type=int, required=True)
+parser.add_argument("--fdmem", help="File descriptor of memory socket", type=int, required=True)
+parser.add_argument("--fdfs", help="File descriptor of fs socket", type=int, required=True)
 parser.add_argument("-v", help="Verbosity level", default=ph_criu_api.def_verb, type=int, dest="verbose")
 parser.add_argument("--keep-images", help="Keep images after migration", default=False, action='store_true')
 parser.add_argument("--dst-rpid", help="Write pidfile on restore", default=None)
@@ -31,7 +36,6 @@ parser.add_argument("--img-path", help="Directory where to put images",
 		    default=ph_images.def_path)
 parser.add_argument("--pid-root", help="Path to tree's FS root")
 parser.add_argument("--force", help="Don't do any sanity (CPU compat) checks", default=False, action='store_true')
-parser.add_argument("--port", help="Port where to haul", type=int, default=ph_xem_rpc.rpc_port)
 parser.add_argument("--log-file", help="Write logging messages to specified file")
 parser.add_argument("-j", "--shell-job",help ="Allow migration of shell jobs",
 		    default=False, action='store_true')
@@ -47,11 +51,17 @@ def log_uncaught_exception(type, value, traceback):
 	logging.error(value, exc_info=(type, value, traceback))
 sys.excepthook = log_uncaught_exception
 
-args_dict = vars(args)
-ph_type = (args_dict.pop("type"), args_dict.pop("id"))
-dst = (args_dict.pop("to"), args_dict.pop("port"))
+logging.info("Starting p.haul")
+
+# Establish connection
+connection = phaul.p_haul_connection.establish(args.fdrpc, args.fdmem,
+	args.fdfs)
 
 # Start the migration
-worker = ph_iters.phaul_iter_worker(ph_type, dst)
-worker.set_options(args_dict)
+ph_type = args.type, args.id
+worker = ph_iters.phaul_iter_worker(ph_type, connection)
+worker.set_options(vars(args))
 worker.start_migration()
+
+# Close connection
+connection.close()
diff --git a/p.haul-service b/p.haul-service
index fbaa7ff..5d0a9e8 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -6,10 +6,12 @@ import argparse
 import logging
 import phaul.xem_rpc as ph_xem_rpc
 import phaul.p_haul_service as ph_srv
+import phaul.p_haul_connection
 
 parser = argparse.ArgumentParser("Process HAULer service server")
-parser.add_argument("--bind-addr", help="IP to bind to", type=str, default="0.0.0.0")
-parser.add_argument("--bind-port", help="Port to bind to", type=int, default=ph_xem_rpc.rpc_port)
+parser.add_argument("--fdrpc", help="File descriptor of rpc socket", type=int, required=True)
+parser.add_argument("--fdmem", help="File descriptor of memory socket", type=int, required=True)
+parser.add_argument("--fdfs", help="File descriptor of fs socket", type=int, required=True)
 parser.add_argument("--log-file", help="Write logging messages to specified file")
 
 args = parser.parse_args()
@@ -23,15 +25,18 @@ def log_uncaught_exception(type, value, traceback):
 	logging.error(value, exc_info=(type, value, traceback))
 sys.excepthook = log_uncaught_exception
 
-host = (args.bind_addr, args.bind_port)
-
 sfd = None
 def fin(foo, bar):
 	logging.info("Stop by %d", foo)
 	sfd.close()
 
-logging.info("Starting p.haul rpyc service")
-t = ph_xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, host)
+logging.info("Starting p.haul service")
+
+# Establish connection
+connection = phaul.p_haul_connection.establish(args.fdrpc, args.fdmem,
+	args.fdfs)
+
+t = ph_xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, connection)
 
 # FIXME: Setup stop handlers
 sfd = t.get_stop_fd()
@@ -42,3 +47,6 @@ t.start()
 signal.pause()
 t.join()
 logging.info("Bye!")
+
+# Close connection
+connection.close()
diff --git a/phaul/fs_haul_shared.py b/phaul/fs_haul_shared.py
index d6b3a61..2212e0e 100644
--- a/phaul/fs_haul_shared.py
+++ b/phaul/fs_haul_shared.py
@@ -8,7 +8,7 @@ class p_haul_fs:
 	def __init__(self):
 		logging.info("Initilized shared FS hauler")
 
-	def set_target_host(self, thost):
+	def set_options(self, opts):
 		pass
 
 	def set_work_dir(self, wdir):
diff --git a/phaul/fs_haul_subtree.py b/phaul/fs_haul_subtree.py
index 5d90cb9..a9bd559 100644
--- a/phaul/fs_haul_subtree.py
+++ b/phaul/fs_haul_subtree.py
@@ -14,9 +14,10 @@ class p_haul_fs:
 	def __init__(self, subtree_path):
 		logging.info("Initialized subtree FS hauler (%s)", subtree_path)
 		self.__root = subtree_path
+		self.__thost = None
 
-	def set_target_host(self, thost):
-		self.__thost = thost
+	def set_options(self, opts):
+		self.__thost = opts["to"]
 
 	def set_work_dir(self, wdir):
 		self.__wdir = wdir
diff --git a/phaul/p_haul_connection.py b/phaul/p_haul_connection.py
new file mode 100644
index 0000000..48b962a
--- /dev/null
+++ b/phaul/p_haul_connection.py
@@ -0,0 +1,44 @@
+#
+# p.haul connection module contain logic needed to establish connection
+# between p.haul and p.haul-service.
+#
+
+import logging
+import socket
+import util
+
+class connection:
+	"""p.haul connection
+
+	Class encapsulate connections reqired for p.haul work, including rpc socket
+	(socket for RPC calls), memory socket (socket for c/r images migration) and
+	fs socket (socket for disk migration).
+	"""
+
+	def __init__(self, rpc_sk, mem_sk, fs_sk):
+		self.rpc_sk = rpc_sk
+		self.mem_sk = mem_sk
+		self.fs_sk = fs_sk
+
+	def close(self):
+		self.rpc_sk.close()
+		self.mem_sk.close()
+		self.fs_sk.close()
+
+def establish(fdrpc, fdmem, fdfs):
+	"""Construct required socket objects from file descriptors
+
+	Expect that each file descriptor represent socket opened in blocking mode
+	with domain AF_INET and type SOCK_STREAM.
+	"""
+
+	logging.info("Use existing connections, fdrpc=%d fdmem=%d fdfs=%d", fdrpc,
+		fdmem, fdfs)
+
+	rpc_sk = socket.fromfd(fdrpc, socket.AF_INET, socket.SOCK_STREAM)
+	mem_sk = socket.fromfd(fdmem, socket.AF_INET, socket.SOCK_STREAM)
+	fs_sk = socket.fromfd(fdfs, socket.AF_INET, socket.SOCK_STREAM)
+
+	util.set_cloexec(rpc_sk)
+
+	return connection(rpc_sk, mem_sk, fs_sk)
diff --git a/phaul/p_haul_iters.py b/phaul/p_haul_iters.py
index ba96de3..06049d3 100644
--- a/phaul/p_haul_iters.py
+++ b/phaul/p_haul_iters.py
@@ -23,14 +23,14 @@ phaul_iter_min_size = 64
 phaul_iter_grow_max = 10
 
 class phaul_iter_worker:
-	def __init__(self, p_type, host):
-		logging.info("Connecting to target host")
-		self.target_host = xem_rpc_client.rpc_proxy(host)
-		self.data_socket = self.target_host.open_socket("datask")
+	def __init__(self, p_type, connection):
+		self.connection = connection
+		self.target_host = xem_rpc_client.rpc_proxy(self.connection.rpc_sk)
 
 		logging.info("Setting up local")
+		self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
 		self.img = images.phaul_images("dmp")
-		self.criu_connection = criu_api.criu_conn(self.data_socket)
+
 		self.htype = p_haul_type.get_src(p_type)
 		if not self.htype:
 			raise Exception("No htype driver found")
@@ -40,7 +40,6 @@ class phaul_iter_worker:
 			raise Exception("No FS driver found")
 
 		self.pid = self.htype.root_task_pid()
-		self.fs.set_target_host(host[0])
 
 		logging.info("Setting up remote")
 		self.target_host.setup(p_type)
@@ -54,6 +53,7 @@ class phaul_iter_worker:
 		self.criu_connection.shell_job(opts["shell_job"])
 		self.img.set_options(opts)
 		self.htype.set_options(opts)
+		self.fs.set_options(opts)
 		self.__force = opts["force"]
 
 	def validate_cpu(self):
@@ -66,7 +66,7 @@ class phaul_iter_worker:
 			raise Exception("Can't dump cpuinfo")
 
 		logging.info("\t`- Sending CPU info")
-		self.img.send_cpuinfo(self.target_host, self.data_socket)
+		self.img.send_cpuinfo(self.target_host, self.connection.mem_sk)
 
 		logging.info("\t`- Checking CPU info")
 		if not self.target_host.check_cpuinfo():
@@ -200,7 +200,7 @@ class phaul_iter_worker:
 		logging.info("Final FS and images sync")
 		self.fs.stop_migration()
 		self.img.sync_imgs_to_target(self.target_host, self.htype,
-			self.data_socket)
+			self.connection.mem_sk)
 
 		logging.info("Asking target host to restore")
 		self.target_host.restore_from_images()
diff --git a/phaul/p_haul_service.py b/phaul/p_haul_service.py
index 0aceafb..409487a 100644
--- a/phaul/p_haul_service.py
+++ b/phaul/p_haul_service.py
@@ -10,23 +10,23 @@ import criu_req
 import p_haul_type
 
 class phaul_service:
-	def on_connect(self):
-		logging.info("Connected")
-		self.dump_iter = 0
-		self.restored = False
+	def __init__(self, mem_sk, fs_sk):
 		self.criu_connection = None
-		self.data_socket = None
+		self.data_socket = mem_sk
+		self._fs_sk = fs_sk
 		self.img = None
 		self.htype = None
+		self.dump_iter = 0
+		self.restored = False
+
+	def on_connect(self):
+		logging.info("Connected")
 
 	def on_disconnect(self):
 		logging.info("Disconnected")
 		if self.criu_connection:
 			self.criu_connection.close()
 
-		if self.data_socket:
-			self.data_socket.close()
-
 		if self.htype and not self.restored:
 			self.htype.umount()
 
@@ -36,10 +36,6 @@ class phaul_service:
 				self.img.save_images()
 			self.img.close()
 
-	def on_socket_open(self, sk, uname):
-		self.data_socket = sk
-		logging.info("Data socket (%s) accepted", uname)
-
 	def rpc_setup(self, htype_id):
 		logging.info("Setting up service side %s", htype_id)
 		self.img = images.phaul_images("rst")
diff --git a/phaul/xem_rpc.py b/phaul/xem_rpc.py
index 92963d6..bafd994 100644
--- a/phaul/xem_rpc.py
+++ b/phaul/xem_rpc.py
@@ -9,7 +9,6 @@ import traceback
 import logging
 import util
 
-rpc_port = 12345
 rpc_sk_buf = 16384
 
 RPC_CMD = 1
@@ -26,19 +25,12 @@ class _rpc_server_sk:
 	def fileno(self):
 		return self._sk.fileno()
 
-	def hash_name(self):
-		return self._sk.getpeername()
-
-	def get_name(self, mgr):
-		return self.hash_name()
-
 	def work(self, mgr):
 		raw_data = self._sk.recv(rpc_sk_buf)
 		if not raw_data:
 			mgr.remove(self)
 			if self._master:
 				self._master.on_disconnect()
-			self._sk.close()
 			return
 
 		data = eval(raw_data)
@@ -62,30 +54,9 @@ class _rpc_server_sk:
 		self._sk.send(raw_data)
 
 	def init_rpc(self, mgr, args):
-		util.set_cloexec(self)
 		self._master = mgr.make_master()
 		self._master.on_connect(*args)
 
-	def pick_channel(self, mgr, hash_name, uname):
-		sk = mgr.pick_sk(hash_name)
-		if sk:
-			self._master.on_socket_open(sk._sk, uname)
-
-class _rpc_server_ask:
-	def __init__(self, host):
-		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.bind(host)
-		sk.listen(8)
-		self._sk = sk
-		util.set_cloexec(self)
-
-	def fileno(self):
-		return self._sk.fileno()
-
-	def work(self, mgr):
-		sk, addr = self._sk.accept()
-		mgr.add(_rpc_server_sk(sk))
-
 class _rpc_stop_fd:
 	def __init__(self, fd):
 		self._fd = fd
@@ -97,35 +68,29 @@ class _rpc_stop_fd:
 		mgr.stop()
 
 class _rpc_server_manager:
-	def __init__(self, srv_class, host):
+	def __init__(self, srv_class, connection):
 		self._srv_class = srv_class
-		self._sk_by_name = {}
-		self._poll_list = [_rpc_server_ask(host)]
+		self._connection = connection
+		self._poll_list = []
 		self._alive = True
 
-	def add(self, sk):
-		self._sk_by_name[sk.hash_name()] = sk
-		self._poll_list.append(sk)
+		self.add(_rpc_server_sk(connection.rpc_sk))
 
-	def remove(self, sk):
-		self._sk_by_name.pop(sk.hash_name())
-		self._poll_list.remove(sk)
+	def add(self, item):
+		self._poll_list.append(item)
 
-	def pick_sk(self, hash_name):
-		sk = self._sk_by_name.pop(hash_name, None)
-		if sk:
-			self._poll_list.remove(sk)
-		return sk
+	def remove(self, item):
+		self._poll_list.remove(item)
 
 	def make_master(self):
-		return self._srv_class()
+		return self._srv_class(self._connection.mem_sk, self._connection.fs_sk)
 
 	def stop(self):
 		self._alive = False
 
 	def loop(self, stop_fd):
 		if stop_fd:
-			self._poll_list.append(_rpc_stop_fd(stop_fd))
+			self.add(_rpc_stop_fd(stop_fd))
 
 		while self._alive:
 			r, w, x = select.select(self._poll_list, [], [])
@@ -135,9 +100,9 @@ class _rpc_server_manager:
 		logging.info("RPC Service stops")
 
 class rpc_threaded_srv(threading.Thread):
-	def __init__(self, srv_class, host):
+	def __init__(self, srv_class, connection):
 		threading.Thread.__init__(self)
-		self._mgr = _rpc_server_manager(srv_class, host)
+		self._mgr = _rpc_server_manager(srv_class, connection)
 		self._stop_fd = None
 
 	def run(self):
diff --git a/phaul/xem_rpc_client.py b/phaul/xem_rpc_client.py
index 07246c0..c6fe8c3 100644
--- a/phaul/xem_rpc_client.py
+++ b/phaul/xem_rpc_client.py
@@ -29,24 +29,10 @@ class _rpc_proxy_caller:
 			raise Exception("Proto resp error")
 
 class rpc_proxy:
-	def __init__(self, conn, *args):
-		self._srv = conn
-		self._rpc_sk = self._make_sk()
-		util.set_cloexec(self._rpc_sk)
+	def __init__(self, sk, *args):
+		self._rpc_sk = sk
 		c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "init_rpc")
 		c(args)
 
 	def __getattr__(self, attr):
 		return _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CALL, attr)
-
-	def _make_sk(self):
-		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.connect(self._srv)
-		return sk
-
-	def open_socket(self, uname):
-		sk = self._make_sk()
-		host = _rpc_proxy_caller(sk, xem_rpc.RPC_CMD, "get_name")()
-		c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "pick_channel")
-		c(host, uname)
-		return sk
-- 
1.7.1



More information about the CRIU mailing list