[CRIU] [PATCH 1/4] p.haul: handle multiple disks in ploop fs hauler

Nikita Spiridonov nspiridonov at odin.com
Thu Dec 10 02:12:02 PST 2015


Add multiple disks migration support to ploop fs hauler. Now
p_haul_fs and p_haul_fs_receiver accept list of active deltas
with corresponding sockets.

Signed-off-by: Nikita Spiridonov <nspiridonov at odin.com>
---
 phaul/fs_haul_ploop.py |  101 ++++++++++++++++++++++++++++++++++++++---------
 phaul/p_haul_vz.py     |   29 ++------------
 phaul/service.py       |    4 +-
 3 files changed, 87 insertions(+), 47 deletions(-)

diff --git a/phaul/fs_haul_ploop.py b/phaul/fs_haul_ploop.py
index abfb052..dc115d3 100644
--- a/phaul/fs_haul_ploop.py
+++ b/phaul/fs_haul_ploop.py
@@ -2,21 +2,31 @@
 # ploop disk hauler
 #
 
+import os
 import logging
 import threading
 import libploop
 
 
+DDXML_FILENAME = "DiskDescriptor.xml"
+
+
 class p_haul_fs:
-	def __init__(self, ddxml_path, fs_sk):
-		"""Initialize ploop disk hauler
+	def __init__(self, deltas):
+		"""Initialize ploop disks hauler
 
-		Initialize ploop disk hauler with specified path to DiskDescriptor.xml
-		file and socket.
+		For each disk create libploop.ploopcopy object using path to disk
+		descriptor file and corresponding socket.
 		"""
 
-		logging.info("Initilized ploop hauler (%s)", ddxml_path)
-		self.__ploopcopy = libploop.ploopcopy(ddxml_path, fs_sk.fileno())
+		# Create libploop.ploopcopy objects, one per active ploop delta
+		self.__log_init_hauler(deltas)
+		self.__ploop_copies = []
+		for delta_path, delta_fd in deltas:
+			ddxml_path = self.__get_ddxml_path(delta_path)
+			self.__check_ddxml(ddxml_path)
+			self.__ploop_copies.append(
+				libploop.ploopcopy(ddxml_path, delta_fd))
 
 	def set_options(self, opts):
 		pass
@@ -25,35 +35,86 @@ class p_haul_fs:
 		pass
 
 	def start_migration(self):
-		self.__ploopcopy.copy_start()
+		for ploopcopy in self.__ploop_copies:
+			ploopcopy.copy_start()
 
 	def next_iteration(self):
-		self.__ploopcopy.copy_next_iteration()
+		for ploopcopy in self.__ploop_copies:
+			ploopcopy.copy_next_iteration()
 
 	def stop_migration(self):
-		self.__ploopcopy.copy_stop()
+		for ploopcopy in self.__ploop_copies:
+			ploopcopy.copy_stop()
 
 	def persistent_inodes(self):
 		"""Inode numbers do not change during ploop disk migration"""
 		return True
 
+	def __log_init_hauler(self, deltas):
+		logging.info("Initialize ploop hauler")
+		for delta in deltas:
+			logging.info("\t`- %s", delta[0])
+
+	def __get_ddxml_path(self, delta_path):
+		"""Get path to disk descriptor file by path to disk delta"""
+		return os.path.join(os.path.dirname(delta_path), DDXML_FILENAME)
+
+	def __check_ddxml(self, ddxml_path):
+		"""Check disk descriptor file exist"""
+		if not os.path.isfile(ddxml_path):
+			raise Exception("{0} file missing".format(ddxml_path))
 
-class p_haul_fs_receiver(threading.Thread):
-	def __init__(self, fname_path, fs_sk):
-		"""Initialize ploop disk receiver
 
-		Initialize ploop disk receiver with specified path to root.hds file
-		and socket.
+class p_haul_fs_receiver:
+	def __init__(self, deltas):
+		"""Initialize ploop disks receiver
+
+		For each disk create delta receiver object using path to active delta
+		of the ploop disk and corresponding socket.
 		"""
 
+		# Create delta_receiver objects, one per active ploop delta
+		self.__log_init_receiver(deltas)
+		self.__delta_receivers = []
+		for delta_path, delta_fd in deltas:
+			self.__check_delta(delta_path)
+			self.__delta_receivers.append(delta_receiver(delta_path, delta_fd))
+
+	def start_receive(self):
+		"""Start all delta receiver threads"""
+		for receiver in self.__delta_receivers:
+			receiver.start()
+
+	def stop_receive(self):
+		"""Join all delta receiver threads"""
+		for receiver in self.__delta_receivers:
+			receiver.join()
+
+	def __log_init_receiver(self, deltas):
+		logging.info("Initialize ploop receiver")
+		for delta in deltas:
+			logging.info("\t`- %s", delta[0])
+
+	def __check_delta(self, delta_path):
+		"""Check delta file don't exist and parent directory exist"""
+
+		delta_dir = os.path.dirname(delta_path)
+		if not os.path.isdir(delta_dir):
+			raise Exception("{0} directory missing".format(delta_dir))
+
+		if os.path.isfile(delta_path):
+			raise Exception("{0} already exist".format(delta_path))
+
+
+class delta_receiver(threading.Thread):
+	def __init__(self, delta_path, delta_fd):
+		"""Initialize ploop single active delta receiver"""
 		threading.Thread.__init__(self)
-		self.__fname_path = fname_path
-		self.__fs_sk = fs_sk
+		self.__path = delta_path
+		self.__fd = delta_fd
 
 	def run(self):
 		try:
-			logging.info("Started fs receiver")
-			libploop.ploopcopy_receiver(self.__fname_path,
-				self.__fs_sk.fileno())
+			libploop.ploopcopy_receiver(self.__path, self.__fd)
 		except:
-			logging.exception("Exception in p_haul_fs_receiver")
+			logging.exception("Exception in %s delta receiver", self.__path)
diff --git a/phaul/p_haul_vz.py b/phaul/p_haul_vz.py
index da62d2b..db13f2d 100644
--- a/phaul/p_haul_vz.py
+++ b/phaul/p_haul_vz.py
@@ -8,7 +8,6 @@ import shlex
 import logging
 import criu_cr
 import util
-import fs_haul_shared
 import fs_haul_ploop
 import pycriu.rpc
 
@@ -190,32 +189,12 @@ class p_haul_type:
 			self._fs_mounted = False
 
 	def get_fs(self, fs_sk=None):
-		rootfs = self.__get_priv_fs_name()
-		logging.info("CT is on %s", rootfs)
-		if rootfs == "nfs":
-			return fs_haul_shared.p_haul_fs()
-		elif rootfs == "ext3" or rootfs == "ext4":
-			ddxml_path = os.path.join(self._ct_priv, "root.hdd",
-				"DiskDescriptor.xml")
-			return fs_haul_ploop.p_haul_fs(ddxml_path, fs_sk)
-		else:
-			logging.error("Unknown CT FS")
-			return None
+		deltas = [(os.path.join(self._ct_priv, "root.hdd", "root.hds"), fs_sk)]
+		return fs_haul_ploop.p_haul_fs(deltas)
 
 	def get_fs_receiver(self, fs_sk=None):
-		rootfs = self.__get_priv_fs_name()
-		logging.info("CT is on %s", rootfs)
-		if rootfs == "ext3" or rootfs == "ext4":
-			fname_path = os.path.join(self._ct_priv, "root.hdd", "root.hds")
-			return fs_haul_ploop.p_haul_fs_receiver(fname_path, fs_sk)
-		else:
-			return None
-
-	def __get_priv_fs_name(self):
-		rootfs = util.path_to_fs(self._ct_priv)
-		if not rootfs:
-			raise Exception("CT is on unknown FS")
-		return rootfs
+		deltas = [(os.path.join(self._ct_priv, "root.hdd", "root.hds"), fs_sk)]
+		return fs_haul_ploop.p_haul_fs_receiver(deltas)
 
 	def restored(self, pid):
 		pass
diff --git a/phaul/service.py b/phaul/service.py
index b5ce48f..577278e 100644
--- a/phaul/service.py
+++ b/phaul/service.py
@@ -33,7 +33,7 @@ class phaul_service:
 
 		# Stop fs receiver if it is running
 		if self.__fs_receiver:
-			self.__fs_receiver.join()
+			self.__fs_receiver.stop_receive()
 
 		if self.img:
 			logging.info("Closing images")
@@ -51,7 +51,7 @@ class phaul_service:
 		# Create and start fs receiver if current p.haul module provide it
 		self.__fs_receiver = self.htype.get_fs_receiver(self._fs_sk)
 		if self.__fs_receiver:
-			self.__fs_receiver.start()
+			self.__fs_receiver.start_receive()
 
 	def rpc_set_options(self, opts):
 		self.criu_connection.set_options(opts)
-- 
1.7.1



More information about the CRIU mailing list