[CRIU] [PATCH 1/4] p.haul: handle multiple disks in ploop fs hauler

Pavel Emelyanov xemul at parallels.com
Fri Dec 11 05:45:10 PST 2015


On 12/10/2015 01:12 PM, Nikita Spiridonov wrote:
> Add multiple disks migration support to ploop fs hauler. Now
> p_haul_fs and p_haul_fs_receiver accept list of active deltas
> with corresponding sockets.

Please, split the patch. First -- introduce delta receiver class
and let ploop hauler use only once instance of it. Then -- equip
delta receiver and hauler with anything additional stuff you need 
(e.g. the __log_init_hauler thing). Then make hauler use multiple
receivers.


> Signed-off-by: Nikita Spiridonov <nspiridonov at odin.com>
> ---
>  phaul/fs_haul_ploop.py |  101 ++++++++++++++++++++++++++++++++++++++---------
>  phaul/p_haul_vz.py     |   29 ++------------
>  phaul/service.py       |    4 +-
>  3 files changed, 87 insertions(+), 47 deletions(-)
> 
> diff --git a/phaul/fs_haul_ploop.py b/phaul/fs_haul_ploop.py
> index abfb052..dc115d3 100644
> --- a/phaul/fs_haul_ploop.py
> +++ b/phaul/fs_haul_ploop.py
> @@ -2,21 +2,31 @@
>  # ploop disk hauler
>  #
>  
> +import os
>  import logging
>  import threading
>  import libploop
>  
>  
> +DDXML_FILENAME = "DiskDescriptor.xml"
> +
> +
>  class p_haul_fs:
> -	def __init__(self, ddxml_path, fs_sk):
> -		"""Initialize ploop disk hauler
> +	def __init__(self, deltas):
> +		"""Initialize ploop disks hauler
>  
> -		Initialize ploop disk hauler with specified path to DiskDescriptor.xml
> -		file and socket.
> +		For each disk create libploop.ploopcopy object using path to disk
> +		descriptor file and corresponding socket.
>  		"""
>  
> -		logging.info("Initilized ploop hauler (%s)", ddxml_path)
> -		self.__ploopcopy = libploop.ploopcopy(ddxml_path, fs_sk.fileno())
> +		# Create libploop.ploopcopy objects, one per active ploop delta
> +		self.__log_init_hauler(deltas)
> +		self.__ploop_copies = []
> +		for delta_path, delta_fd in deltas:
> +			ddxml_path = self.__get_ddxml_path(delta_path)
> +			self.__check_ddxml(ddxml_path)
> +			self.__ploop_copies.append(
> +				libploop.ploopcopy(ddxml_path, delta_fd))
>  
>  	def set_options(self, opts):
>  		pass
> @@ -25,35 +35,86 @@ class p_haul_fs:
>  		pass
>  
>  	def start_migration(self):
> -		self.__ploopcopy.copy_start()
> +		for ploopcopy in self.__ploop_copies:
> +			ploopcopy.copy_start()
>  
>  	def next_iteration(self):
> -		self.__ploopcopy.copy_next_iteration()
> +		for ploopcopy in self.__ploop_copies:
> +			ploopcopy.copy_next_iteration()
>  
>  	def stop_migration(self):
> -		self.__ploopcopy.copy_stop()
> +		for ploopcopy in self.__ploop_copies:
> +			ploopcopy.copy_stop()
>  
>  	def persistent_inodes(self):
>  		"""Inode numbers do not change during ploop disk migration"""
>  		return True
>  
> +	def __log_init_hauler(self, deltas):
> +		logging.info("Initialize ploop hauler")
> +		for delta in deltas:
> +			logging.info("\t`- %s", delta[0])
> +
> +	def __get_ddxml_path(self, delta_path):
> +		"""Get path to disk descriptor file by path to disk delta"""
> +		return os.path.join(os.path.dirname(delta_path), DDXML_FILENAME)
> +
> +	def __check_ddxml(self, ddxml_path):
> +		"""Check disk descriptor file exist"""
> +		if not os.path.isfile(ddxml_path):
> +			raise Exception("{0} file missing".format(ddxml_path))
>  
> -class p_haul_fs_receiver(threading.Thread):
> -	def __init__(self, fname_path, fs_sk):
> -		"""Initialize ploop disk receiver
>  
> -		Initialize ploop disk receiver with specified path to root.hds file
> -		and socket.
> +class p_haul_fs_receiver:
> +	def __init__(self, deltas):
> +		"""Initialize ploop disks receiver
> +
> +		For each disk create delta receiver object using path to active delta
> +		of the ploop disk and corresponding socket.
>  		"""
>  
> +		# Create delta_receiver objects, one per active ploop delta
> +		self.__log_init_receiver(deltas)
> +		self.__delta_receivers = []
> +		for delta_path, delta_fd in deltas:
> +			self.__check_delta(delta_path)
> +			self.__delta_receivers.append(delta_receiver(delta_path, delta_fd))
> +
> +	def start_receive(self):
> +		"""Start all delta receiver threads"""
> +		for receiver in self.__delta_receivers:
> +			receiver.start()
> +
> +	def stop_receive(self):
> +		"""Join all delta receiver threads"""
> +		for receiver in self.__delta_receivers:
> +			receiver.join()
> +
> +	def __log_init_receiver(self, deltas):
> +		logging.info("Initialize ploop receiver")
> +		for delta in deltas:
> +			logging.info("\t`- %s", delta[0])
> +
> +	def __check_delta(self, delta_path):
> +		"""Check delta file don't exist and parent directory exist"""
> +
> +		delta_dir = os.path.dirname(delta_path)
> +		if not os.path.isdir(delta_dir):
> +			raise Exception("{0} directory missing".format(delta_dir))
> +
> +		if os.path.isfile(delta_path):
> +			raise Exception("{0} already exist".format(delta_path))
> +
> +
> +class delta_receiver(threading.Thread):
> +	def __init__(self, delta_path, delta_fd):
> +		"""Initialize ploop single active delta receiver"""
>  		threading.Thread.__init__(self)
> -		self.__fname_path = fname_path
> -		self.__fs_sk = fs_sk
> +		self.__path = delta_path
> +		self.__fd = delta_fd
>  
>  	def run(self):
>  		try:
> -			logging.info("Started fs receiver")
> -			libploop.ploopcopy_receiver(self.__fname_path,
> -				self.__fs_sk.fileno())
> +			libploop.ploopcopy_receiver(self.__path, self.__fd)
>  		except:
> -			logging.exception("Exception in p_haul_fs_receiver")
> +			logging.exception("Exception in %s delta receiver", self.__path)
> diff --git a/phaul/p_haul_vz.py b/phaul/p_haul_vz.py
> index da62d2b..db13f2d 100644
> --- a/phaul/p_haul_vz.py
> +++ b/phaul/p_haul_vz.py
> @@ -8,7 +8,6 @@ import shlex
>  import logging
>  import criu_cr
>  import util
> -import fs_haul_shared
>  import fs_haul_ploop
>  import pycriu.rpc
>  
> @@ -190,32 +189,12 @@ class p_haul_type:
>  			self._fs_mounted = False
>  
>  	def get_fs(self, fs_sk=None):
> -		rootfs = self.__get_priv_fs_name()
> -		logging.info("CT is on %s", rootfs)
> -		if rootfs == "nfs":
> -			return fs_haul_shared.p_haul_fs()
> -		elif rootfs == "ext3" or rootfs == "ext4":
> -			ddxml_path = os.path.join(self._ct_priv, "root.hdd",
> -				"DiskDescriptor.xml")
> -			return fs_haul_ploop.p_haul_fs(ddxml_path, fs_sk)
> -		else:
> -			logging.error("Unknown CT FS")
> -			return None
> +		deltas = [(os.path.join(self._ct_priv, "root.hdd", "root.hds"), fs_sk)]
> +		return fs_haul_ploop.p_haul_fs(deltas)
>  
>  	def get_fs_receiver(self, fs_sk=None):
> -		rootfs = self.__get_priv_fs_name()
> -		logging.info("CT is on %s", rootfs)
> -		if rootfs == "ext3" or rootfs == "ext4":
> -			fname_path = os.path.join(self._ct_priv, "root.hdd", "root.hds")
> -			return fs_haul_ploop.p_haul_fs_receiver(fname_path, fs_sk)
> -		else:
> -			return None
> -
> -	def __get_priv_fs_name(self):
> -		rootfs = util.path_to_fs(self._ct_priv)
> -		if not rootfs:
> -			raise Exception("CT is on unknown FS")
> -		return rootfs
> +		deltas = [(os.path.join(self._ct_priv, "root.hdd", "root.hds"), fs_sk)]
> +		return fs_haul_ploop.p_haul_fs_receiver(deltas)
>  
>  	def restored(self, pid):
>  		pass
> diff --git a/phaul/service.py b/phaul/service.py
> index b5ce48f..577278e 100644
> --- a/phaul/service.py
> +++ b/phaul/service.py
> @@ -33,7 +33,7 @@ class phaul_service:
>  
>  		# Stop fs receiver if it is running
>  		if self.__fs_receiver:
> -			self.__fs_receiver.join()
> +			self.__fs_receiver.stop_receive()
>  
>  		if self.img:
>  			logging.info("Closing images")
> @@ -51,7 +51,7 @@ class phaul_service:
>  		# Create and start fs receiver if current p.haul module provide it
>  		self.__fs_receiver = self.htype.get_fs_receiver(self._fs_sk)
>  		if self.__fs_receiver:
> -			self.__fs_receiver.start()
> +			self.__fs_receiver.start_receive()
>  
>  	def rpc_set_options(self, opts):
>  		self.criu_connection.set_options(opts)
> 



More information about the CRIU mailing list