[CRIU] [PATCH 01/10] p.haul: implement migration over existing connections

Tycho Andersen tycho.andersen at canonical.com
Wed Oct 14 12:27:33 PDT 2015


Hi Nikita,

Thanks for this work, it will be very useful for us.

On Fri, Oct 09, 2015 at 09:11:33PM +0400, Nikita Spiridonov wrote:
> Remove standalone mode, p.haul now can work only over existing
> connections specified via command line arguments as file
> descriptors.
> 
> Three arguments required - --fdrpc for rpc calls, --fdmem for c/r
> images migration and --fdfs for disk migration. Expect that each
> file descriptor represent socket opened in blocking mode with domain
> AF_INET and type SOCK_STREAM.

Do we have to require --fdfs here for anything? I haven't looked
through the code to see why exactly it is required.

In LXD (and I guess openvz as well, with your ploop patch) we are
managing our own storage backends, and have our own mechanism for
transporting the rootfs. Ideally, I could invoke p.haul over an fd to
just do the criu iterative piece, and potentially do some callbacks to
tell LXD when the process is stopped so that we can do a final fs
sync.

Thanks,

Tycho

> --to option preserved for now in p.haul for disk migration via
> rsync, it will be unused soon at least for vz module which will
> use ploop for disk migration.
> 
> Signed-off-by: Nikita Spiridonov <nspiridonov at odin.com>
> ---
>  p.haul                     |   40 ++++++++++++++++++-----------
>  p.haul-service             |   20 ++++++++++----
>  phaul/fs_haul_shared.py    |    2 +-
>  phaul/fs_haul_subtree.py   |    5 ++-
>  phaul/p_haul_connection.py |   44 ++++++++++++++++++++++++++++++++
>  phaul/p_haul_iters.py      |   16 ++++++------
>  phaul/p_haul_service.py    |   20 ++++++---------
>  phaul/xem_rpc.py           |   59 +++++++++-----------------------------------
>  phaul/xem_rpc_client.py    |   18 +------------
>  9 files changed, 117 insertions(+), 107 deletions(-)
>  create mode 100644 phaul/p_haul_connection.py
> 
> diff --git a/p.haul b/p.haul
> index e807c8f..5f7b723 100755
> --- a/p.haul
> +++ b/p.haul
> @@ -6,24 +6,29 @@ import logging
>  import phaul.p_haul_iters as ph_iters
>  import phaul.images as ph_images
>  import phaul.criu_api as ph_criu_api
> -import phaul.xem_rpc as ph_xem_rpc
> +import phaul.p_haul_connection
>  
>  # Usage idea
> -# p.haul <type> <id> <destination>
> +# p.haul <type> <id> --fdrpc <fd> --fdmem <fd> --fdfs <fd>
>  #
> -# E.g.
> -#
> -# # p.haul vz 100 10.0.0.1
> -#
> -# or
> +# p.haul work over existing connections specified via command line arguments
> +# as file descriptors. Three arguments required - --fdrpc for rpc calls,
> +# --fdmem for c/r images migration and --fdfs for disk migration. For testing
> +# purposed p.haul-wrap helper script can be used which establish required
> +# connections with target host and call p.haul or p.haul-service.
>  #
> -# # p.haul lxc myct 10.0.0.2
> +# E.g.
> +# p.haul vz 100 --fdrpc 3 --fdmem 4 --fdfs 5
> +# p.haul lxc myct --fdrpc 3 --fdmem 4 --fdfs 5
>  #
>  
>  parser = argparse.ArgumentParser("Process HAULer")
>  parser.add_argument("type", help="Type of hat to haul, e.g. vz or lxc")
>  parser.add_argument("id", help="ID of what to haul")
> -parser.add_argument("to", help="IP where to haul")
> +parser.add_argument("--to", help="IP where to haul")
> +parser.add_argument("--fdrpc", help="File descriptor of rpc socket", type=int, required=True)
> +parser.add_argument("--fdmem", help="File descriptor of memory socket", type=int, required=True)
> +parser.add_argument("--fdfs", help="File descriptor of fs socket", type=int, required=True)
>  parser.add_argument("-v", help="Verbosity level", default=ph_criu_api.def_verb, type=int, dest="verbose")
>  parser.add_argument("--keep-images", help="Keep images after migration", default=False, action='store_true')
>  parser.add_argument("--dst-rpid", help="Write pidfile on restore", default=None)
> @@ -31,7 +36,6 @@ parser.add_argument("--img-path", help="Directory where to put images",
>  		    default=ph_images.def_path)
>  parser.add_argument("--pid-root", help="Path to tree's FS root")
>  parser.add_argument("--force", help="Don't do any sanity (CPU compat) checks", default=False, action='store_true')
> -parser.add_argument("--port", help="Port where to haul", type=int, default=ph_xem_rpc.rpc_port)
>  parser.add_argument("--log-file", help="Write logging messages to specified file")
>  parser.add_argument("-j", "--shell-job",help ="Allow migration of shell jobs",
>  		    default=False, action='store_true')
> @@ -47,11 +51,17 @@ def log_uncaught_exception(type, value, traceback):
>  	logging.error(value, exc_info=(type, value, traceback))
>  sys.excepthook = log_uncaught_exception
>  
> -args_dict = vars(args)
> -ph_type = (args_dict.pop("type"), args_dict.pop("id"))
> -dst = (args_dict.pop("to"), args_dict.pop("port"))
> +logging.info("Starting p.haul")
> +
> +# Establish connection
> +connection = phaul.p_haul_connection.establish(args.fdrpc, args.fdmem,
> +	args.fdfs)
>  
>  # Start the migration
> -worker = ph_iters.phaul_iter_worker(ph_type, dst)
> -worker.set_options(args_dict)
> +ph_type = args.type, args.id
> +worker = ph_iters.phaul_iter_worker(ph_type, connection)
> +worker.set_options(vars(args))
>  worker.start_migration()
> +
> +# Close connection
> +connection.close()
> diff --git a/p.haul-service b/p.haul-service
> index fbaa7ff..5d0a9e8 100755
> --- a/p.haul-service
> +++ b/p.haul-service
> @@ -6,10 +6,12 @@ import argparse
>  import logging
>  import phaul.xem_rpc as ph_xem_rpc
>  import phaul.p_haul_service as ph_srv
> +import phaul.p_haul_connection
>  
>  parser = argparse.ArgumentParser("Process HAULer service server")
> -parser.add_argument("--bind-addr", help="IP to bind to", type=str, default="0.0.0.0")
> -parser.add_argument("--bind-port", help="Port to bind to", type=int, default=ph_xem_rpc.rpc_port)
> +parser.add_argument("--fdrpc", help="File descriptor of rpc socket", type=int, required=True)
> +parser.add_argument("--fdmem", help="File descriptor of memory socket", type=int, required=True)
> +parser.add_argument("--fdfs", help="File descriptor of fs socket", type=int, required=True)
>  parser.add_argument("--log-file", help="Write logging messages to specified file")
>  
>  args = parser.parse_args()
> @@ -23,15 +25,18 @@ def log_uncaught_exception(type, value, traceback):
>  	logging.error(value, exc_info=(type, value, traceback))
>  sys.excepthook = log_uncaught_exception
>  
> -host = (args.bind_addr, args.bind_port)
> -
>  sfd = None
>  def fin(foo, bar):
>  	logging.info("Stop by %d", foo)
>  	sfd.close()
>  
> -logging.info("Starting p.haul rpyc service")
> -t = ph_xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, host)
> +logging.info("Starting p.haul service")
> +
> +# Establish connection
> +connection = phaul.p_haul_connection.establish(args.fdrpc, args.fdmem,
> +	args.fdfs)
> +
> +t = ph_xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, connection)
>  
>  # FIXME: Setup stop handlers
>  sfd = t.get_stop_fd()
> @@ -42,3 +47,6 @@ t.start()
>  signal.pause()
>  t.join()
>  logging.info("Bye!")
> +
> +# Close connection
> +connection.close()
> diff --git a/phaul/fs_haul_shared.py b/phaul/fs_haul_shared.py
> index d6b3a61..2212e0e 100644
> --- a/phaul/fs_haul_shared.py
> +++ b/phaul/fs_haul_shared.py
> @@ -8,7 +8,7 @@ class p_haul_fs:
>  	def __init__(self):
>  		logging.info("Initilized shared FS hauler")
>  
> -	def set_target_host(self, thost):
> +	def set_options(self, opts):
>  		pass
>  
>  	def set_work_dir(self, wdir):
> diff --git a/phaul/fs_haul_subtree.py b/phaul/fs_haul_subtree.py
> index 5d90cb9..a9bd559 100644
> --- a/phaul/fs_haul_subtree.py
> +++ b/phaul/fs_haul_subtree.py
> @@ -14,9 +14,10 @@ class p_haul_fs:
>  	def __init__(self, subtree_path):
>  		logging.info("Initialized subtree FS hauler (%s)", subtree_path)
>  		self.__root = subtree_path
> +		self.__thost = None
>  
> -	def set_target_host(self, thost):
> -		self.__thost = thost
> +	def set_options(self, opts):
> +		self.__thost = opts["to"]
>  
>  	def set_work_dir(self, wdir):
>  		self.__wdir = wdir
> diff --git a/phaul/p_haul_connection.py b/phaul/p_haul_connection.py
> new file mode 100644
> index 0000000..48b962a
> --- /dev/null
> +++ b/phaul/p_haul_connection.py
> @@ -0,0 +1,44 @@
> +#
> +# p.haul connection module contain logic needed to establish connection
> +# between p.haul and p.haul-service.
> +#
> +
> +import logging
> +import socket
> +import util
> +
> +class connection:
> +	"""p.haul connection
> +
> +	Class encapsulate connections reqired for p.haul work, including rpc socket
> +	(socket for RPC calls), memory socket (socket for c/r images migration) and
> +	fs socket (socket for disk migration).
> +	"""
> +
> +	def __init__(self, rpc_sk, mem_sk, fs_sk):
> +		self.rpc_sk = rpc_sk
> +		self.mem_sk = mem_sk
> +		self.fs_sk = fs_sk
> +
> +	def close(self):
> +		self.rpc_sk.close()
> +		self.mem_sk.close()
> +		self.fs_sk.close()
> +
> +def establish(fdrpc, fdmem, fdfs):
> +	"""Construct required socket objects from file descriptors
> +
> +	Expect that each file descriptor represent socket opened in blocking mode
> +	with domain AF_INET and type SOCK_STREAM.
> +	"""
> +
> +	logging.info("Use existing connections, fdrpc=%d fdmem=%d fdfs=%d", fdrpc,
> +		fdmem, fdfs)
> +
> +	rpc_sk = socket.fromfd(fdrpc, socket.AF_INET, socket.SOCK_STREAM)
> +	mem_sk = socket.fromfd(fdmem, socket.AF_INET, socket.SOCK_STREAM)
> +	fs_sk = socket.fromfd(fdfs, socket.AF_INET, socket.SOCK_STREAM)
> +
> +	util.set_cloexec(rpc_sk)
> +
> +	return connection(rpc_sk, mem_sk, fs_sk)
> diff --git a/phaul/p_haul_iters.py b/phaul/p_haul_iters.py
> index ba96de3..06049d3 100644
> --- a/phaul/p_haul_iters.py
> +++ b/phaul/p_haul_iters.py
> @@ -23,14 +23,14 @@ phaul_iter_min_size = 64
>  phaul_iter_grow_max = 10
>  
>  class phaul_iter_worker:
> -	def __init__(self, p_type, host):
> -		logging.info("Connecting to target host")
> -		self.target_host = xem_rpc_client.rpc_proxy(host)
> -		self.data_socket = self.target_host.open_socket("datask")
> +	def __init__(self, p_type, connection):
> +		self.connection = connection
> +		self.target_host = xem_rpc_client.rpc_proxy(self.connection.rpc_sk)
>  
>  		logging.info("Setting up local")
> +		self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
>  		self.img = images.phaul_images("dmp")
> -		self.criu_connection = criu_api.criu_conn(self.data_socket)
> +
>  		self.htype = p_haul_type.get_src(p_type)
>  		if not self.htype:
>  			raise Exception("No htype driver found")
> @@ -40,7 +40,6 @@ class phaul_iter_worker:
>  			raise Exception("No FS driver found")
>  
>  		self.pid = self.htype.root_task_pid()
> -		self.fs.set_target_host(host[0])
>  
>  		logging.info("Setting up remote")
>  		self.target_host.setup(p_type)
> @@ -54,6 +53,7 @@ class phaul_iter_worker:
>  		self.criu_connection.shell_job(opts["shell_job"])
>  		self.img.set_options(opts)
>  		self.htype.set_options(opts)
> +		self.fs.set_options(opts)
>  		self.__force = opts["force"]
>  
>  	def validate_cpu(self):
> @@ -66,7 +66,7 @@ class phaul_iter_worker:
>  			raise Exception("Can't dump cpuinfo")
>  
>  		logging.info("\t`- Sending CPU info")
> -		self.img.send_cpuinfo(self.target_host, self.data_socket)
> +		self.img.send_cpuinfo(self.target_host, self.connection.mem_sk)
>  
>  		logging.info("\t`- Checking CPU info")
>  		if not self.target_host.check_cpuinfo():
> @@ -200,7 +200,7 @@ class phaul_iter_worker:
>  		logging.info("Final FS and images sync")
>  		self.fs.stop_migration()
>  		self.img.sync_imgs_to_target(self.target_host, self.htype,
> -			self.data_socket)
> +			self.connection.mem_sk)
>  
>  		logging.info("Asking target host to restore")
>  		self.target_host.restore_from_images()
> diff --git a/phaul/p_haul_service.py b/phaul/p_haul_service.py
> index 0aceafb..409487a 100644
> --- a/phaul/p_haul_service.py
> +++ b/phaul/p_haul_service.py
> @@ -10,23 +10,23 @@ import criu_req
>  import p_haul_type
>  
>  class phaul_service:
> -	def on_connect(self):
> -		logging.info("Connected")
> -		self.dump_iter = 0
> -		self.restored = False
> +	def __init__(self, mem_sk, fs_sk):
>  		self.criu_connection = None
> -		self.data_socket = None
> +		self.data_socket = mem_sk
> +		self._fs_sk = fs_sk
>  		self.img = None
>  		self.htype = None
> +		self.dump_iter = 0
> +		self.restored = False
> +
> +	def on_connect(self):
> +		logging.info("Connected")
>  
>  	def on_disconnect(self):
>  		logging.info("Disconnected")
>  		if self.criu_connection:
>  			self.criu_connection.close()
>  
> -		if self.data_socket:
> -			self.data_socket.close()
> -
>  		if self.htype and not self.restored:
>  			self.htype.umount()
>  
> @@ -36,10 +36,6 @@ class phaul_service:
>  				self.img.save_images()
>  			self.img.close()
>  
> -	def on_socket_open(self, sk, uname):
> -		self.data_socket = sk
> -		logging.info("Data socket (%s) accepted", uname)
> -
>  	def rpc_setup(self, htype_id):
>  		logging.info("Setting up service side %s", htype_id)
>  		self.img = images.phaul_images("rst")
> diff --git a/phaul/xem_rpc.py b/phaul/xem_rpc.py
> index 92963d6..bafd994 100644
> --- a/phaul/xem_rpc.py
> +++ b/phaul/xem_rpc.py
> @@ -9,7 +9,6 @@ import traceback
>  import logging
>  import util
>  
> -rpc_port = 12345
>  rpc_sk_buf = 16384
>  
>  RPC_CMD = 1
> @@ -26,19 +25,12 @@ class _rpc_server_sk:
>  	def fileno(self):
>  		return self._sk.fileno()
>  
> -	def hash_name(self):
> -		return self._sk.getpeername()
> -
> -	def get_name(self, mgr):
> -		return self.hash_name()
> -
>  	def work(self, mgr):
>  		raw_data = self._sk.recv(rpc_sk_buf)
>  		if not raw_data:
>  			mgr.remove(self)
>  			if self._master:
>  				self._master.on_disconnect()
> -			self._sk.close()
>  			return
>  
>  		data = eval(raw_data)
> @@ -62,30 +54,9 @@ class _rpc_server_sk:
>  		self._sk.send(raw_data)
>  
>  	def init_rpc(self, mgr, args):
> -		util.set_cloexec(self)
>  		self._master = mgr.make_master()
>  		self._master.on_connect(*args)
>  
> -	def pick_channel(self, mgr, hash_name, uname):
> -		sk = mgr.pick_sk(hash_name)
> -		if sk:
> -			self._master.on_socket_open(sk._sk, uname)
> -
> -class _rpc_server_ask:
> -	def __init__(self, host):
> -		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> -		sk.bind(host)
> -		sk.listen(8)
> -		self._sk = sk
> -		util.set_cloexec(self)
> -
> -	def fileno(self):
> -		return self._sk.fileno()
> -
> -	def work(self, mgr):
> -		sk, addr = self._sk.accept()
> -		mgr.add(_rpc_server_sk(sk))
> -
>  class _rpc_stop_fd:
>  	def __init__(self, fd):
>  		self._fd = fd
> @@ -97,35 +68,29 @@ class _rpc_stop_fd:
>  		mgr.stop()
>  
>  class _rpc_server_manager:
> -	def __init__(self, srv_class, host):
> +	def __init__(self, srv_class, connection):
>  		self._srv_class = srv_class
> -		self._sk_by_name = {}
> -		self._poll_list = [_rpc_server_ask(host)]
> +		self._connection = connection
> +		self._poll_list = []
>  		self._alive = True
>  
> -	def add(self, sk):
> -		self._sk_by_name[sk.hash_name()] = sk
> -		self._poll_list.append(sk)
> +		self.add(_rpc_server_sk(connection.rpc_sk))
>  
> -	def remove(self, sk):
> -		self._sk_by_name.pop(sk.hash_name())
> -		self._poll_list.remove(sk)
> +	def add(self, item):
> +		self._poll_list.append(item)
>  
> -	def pick_sk(self, hash_name):
> -		sk = self._sk_by_name.pop(hash_name, None)
> -		if sk:
> -			self._poll_list.remove(sk)
> -		return sk
> +	def remove(self, item):
> +		self._poll_list.remove(item)
>  
>  	def make_master(self):
> -		return self._srv_class()
> +		return self._srv_class(self._connection.mem_sk, self._connection.fs_sk)
>  
>  	def stop(self):
>  		self._alive = False
>  
>  	def loop(self, stop_fd):
>  		if stop_fd:
> -			self._poll_list.append(_rpc_stop_fd(stop_fd))
> +			self.add(_rpc_stop_fd(stop_fd))
>  
>  		while self._alive:
>  			r, w, x = select.select(self._poll_list, [], [])
> @@ -135,9 +100,9 @@ class _rpc_server_manager:
>  		logging.info("RPC Service stops")
>  
>  class rpc_threaded_srv(threading.Thread):
> -	def __init__(self, srv_class, host):
> +	def __init__(self, srv_class, connection):
>  		threading.Thread.__init__(self)
> -		self._mgr = _rpc_server_manager(srv_class, host)
> +		self._mgr = _rpc_server_manager(srv_class, connection)
>  		self._stop_fd = None
>  
>  	def run(self):
> diff --git a/phaul/xem_rpc_client.py b/phaul/xem_rpc_client.py
> index 07246c0..c6fe8c3 100644
> --- a/phaul/xem_rpc_client.py
> +++ b/phaul/xem_rpc_client.py
> @@ -29,24 +29,10 @@ class _rpc_proxy_caller:
>  			raise Exception("Proto resp error")
>  
>  class rpc_proxy:
> -	def __init__(self, conn, *args):
> -		self._srv = conn
> -		self._rpc_sk = self._make_sk()
> -		util.set_cloexec(self._rpc_sk)
> +	def __init__(self, sk, *args):
> +		self._rpc_sk = sk
>  		c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "init_rpc")
>  		c(args)
>  
>  	def __getattr__(self, attr):
>  		return _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CALL, attr)
> -
> -	def _make_sk(self):
> -		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> -		sk.connect(self._srv)
> -		return sk
> -
> -	def open_socket(self, uname):
> -		sk = self._make_sk()
> -		host = _rpc_proxy_caller(sk, xem_rpc.RPC_CMD, "get_name")()
> -		c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "pick_channel")
> -		c(host, uname)
> -		return sk
> -- 
> 1.7.1
> 
> _______________________________________________
> CRIU mailing list
> CRIU at openvz.org
> https://lists.openvz.org/mailman/listinfo/criu


More information about the CRIU mailing list