[CRIU] [PATCH 11/11] p.haul: implement migration in so-called restart mode

Nikita Spiridonov nspiridonov at virtuozzo.com
Mon Mar 21 09:35:26 PDT 2016


Implement restart migration mode which migrate fs to target host
iteratively while possible, stop process tree on source host and
start it on target host. C/r not used in this mode.

Signed-off-by: Nikita Spiridonov <nspiridonov at virtuozzo.com>
---
 phaul/criu_api.py |    2 +-
 phaul/iters.py    |  115 ++++++++++++++++++++++++++++++++++++++++++++++------
 phaul/service.py  |   35 +++++++++++-----
 3 files changed, 127 insertions(+), 25 deletions(-)

diff --git a/phaul/criu_api.py b/phaul/criu_api.py
index 2bfbf65..72ed256 100644
--- a/phaul/criu_api.py
+++ b/phaul/criu_api.py
@@ -31,7 +31,7 @@ class criu_conn:
 		self._shell_job = False
 		css = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
 		util.set_cloexec(css[1])
-		logging.info("\t`- Passing (ctl:%d, data:%d) pair to CRIU", css[0].fileno(), mem_sk.fileno())
+		logging.info("Passing (ctl:%d, data:%d) pair to CRIU", css[0].fileno(), mem_sk.fileno())
 		self._swrk = subprocess.Popen([criu_binary, "swrk", "%d" % css[0].fileno()])
 		css[0].close()
 		self._cs = css[1]
diff --git a/phaul/iters.py b/phaul/iters.py
index 5916244..cbac565 100644
--- a/phaul/iters.py
+++ b/phaul/iters.py
@@ -22,6 +22,16 @@ PRE_DUMP_DISABLE = False
 PRE_DUMP_ENABLE = True
 
 
+def is_live_mode(mode):
+	"""Check is migration running in live mode"""
+	return mode == MIGRATION_MODE_LIVE
+
+
+def is_restart_mode(mode):
+	"""Check is migration running in restart mode"""
+	return mode == MIGRATION_MODE_RESTART
+
+
 class iter_consts:
 	"""Constants for iterations management"""
 
@@ -45,9 +55,6 @@ class phaul_iter_worker:
 		self.target_host = xem_rpc_client.rpc_proxy(self.connection.rpc_sk)
 
 		logging.info("Setting up local")
-		self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
-		self.img = images.phaul_images("dmp")
-
 		self.htype = htype.get_src(p_type)
 		if not self.htype:
 			raise Exception("No htype driver found")
@@ -56,8 +63,14 @@ class phaul_iter_worker:
 		if not self.fs:
 			raise Exception("No FS driver found")
 
+		self.img = None
+		self.criu_connection = None
+		if is_live_mode(self.__mode):
+			self.img = images.phaul_images("dmp")
+			self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
+
 		logging.info("Setting up remote")
-		self.target_host.setup(p_type)
+		self.target_host.setup(p_type, mode)
 
 	def get_target_host(self):
 		return self.target_host
@@ -65,11 +78,13 @@ class phaul_iter_worker:
 	def set_options(self, opts):
 		self.__force = opts["force"]
 		self.__pre_dump = opts["pre_dump"]
-		self.target_host.set_options(opts)
-		self.criu_connection.set_options(opts)
-		self.img.set_options(opts)
 		self.htype.set_options(opts)
 		self.fs.set_options(opts)
+		if self.img:
+			self.img.set_options(opts)
+		if self.criu_connection:
+			self.criu_connection.set_options(opts)
+		self.target_host.set_options(opts)
 
 	def __validate_cpu(self):
 		if self.__force:
@@ -127,9 +142,9 @@ class phaul_iter_worker:
 
 	def start_migration(self):
 		logging.info("Start migration in %s mode", self.__mode)
-		if self.__mode == MIGRATION_MODE_LIVE:
+		if is_live_mode(self.__mode):
 			self.__start_live_migration()
-		elif self.__mode == MIGRATION_MODE_RESTART:
+		elif is_restart_mode(self.__mode):
 			self.__start_restart_migration()
 		else:
 			raise Exception("Unknown migration mode")
@@ -196,9 +211,9 @@ class phaul_iter_worker:
 		# Restore htype on target
 		logging.info("Asking target host to restore")
 		self.target_host.restore_from_images()
+		logging.info("Restored on target host")
 
 		# Ack previous dump request to terminate all frozen tasks
-		logging.info("Restored on target host")
 		resp = self.criu_connection.ack_notify()
 		if not resp.success:
 			raise Exception("Dump screwed up")
@@ -220,7 +235,53 @@ class phaul_iter_worker:
 		tree on source host and start it on target host.
 		"""
 
-		raise Exception("Not implemented")
+		migration_stats = mstats.restart_stats()
+		migration_stats.handle_start()
+
+		# Handle preliminary FS migration
+		logging.info("Preliminary FS migration")
+		fsstats = self.fs.start_migration()
+		migration_stats.handle_preliminary(fsstats)
+
+		iter_index = 0
+		prev_fsstats = None
+
+		while True:
+
+			# Handle FS migration iteration
+			logging.info("* Iteration %d", iter_index)
+			fsstats = self.fs.next_iteration()
+			migration_stats.handle_iteration(fsstats)
+
+			# Decide whether we continue iteration or stop and do final sync
+			if not self.__check_restart_iter_progress(iter_index, fsstats, prev_fsstats):
+				break
+
+			iter_index += 1
+			prev_fsstats = fsstats
+
+		# Stop htype on source and leave it mounted
+		logging.info("Final stop and start")
+		self.htype.stop(False)
+
+		try:
+			# Handle final FS sync on mounted htype
+			logging.info("Final FS sync")
+			fsstats = self.fs.stop_migration()
+			migration_stats.handle_iteration(fsstats)
+
+			# Start htype on target
+			logging.info("Asking target host to start")
+			self.target_host.start_htype()
+			logging.info("Started on target host")
+
+		except:
+			self.htype.start()
+			raise
+
+		logging.info("Migration succeeded")
+		self.htype.umount()
+		migration_stats.handle_stop()
 
 	def __check_live_iter_progress(self, index, dstats, prev_dstats):
 
@@ -231,9 +292,9 @@ class phaul_iter_worker:
 			return False
 
 		if prev_dstats:
-			w_add = dstats.pages_written - prev_dstats.pages_written
-			w_add = w_add * 100 / prev_dstats.pages_written
-			if w_add > iter_consts.MAX_ITER_GROW_RATE:
+			grow_rate = self.__calc_grow_rate(dstats.pages_written,
+				prev_dstats.pages_written)
+			if grow_rate > iter_consts.MAX_ITER_GROW_RATE:
 				logging.info("\t> Iteration grows")
 				return False
 
@@ -243,3 +304,29 @@ class phaul_iter_worker:
 
 		logging.info("\t> Proceed to next iteration")
 		return True
+
+	def __check_restart_iter_progress(self, index, fsstats, prev_fsstats):
+
+		logging.info("Checking iteration progress:")
+
+		if fsstats.bytes_xferred <= iter_consts.MIN_ITER_FS_XFER_BYTES:
+			logging.info("\t> Small fs transfer")
+			return False
+
+		if prev_fsstats:
+			grow_rate = self.__calc_grow_rate(fsstats.bytes_xferred,
+				prev_fsstats.bytes_xferred)
+			if grow_rate > iter_consts.MAX_ITER_GROW_RATE:
+				logging.info("\t> Iteration grows")
+				return False
+
+		if index >= iter_consts.MAX_ITERS_COUNT:
+			logging.info("\t> Too many iterations")
+			return False
+
+		logging.info("\t> Proceed to next iteration")
+		return True
+
+	def __calc_grow_rate(self, value, prev_value):
+		delta = value - prev_value
+		return delta * 100 / prev_value
diff --git a/phaul/service.py b/phaul/service.py
index 5a3a380..ecb0cc8 100644
--- a/phaul/service.py
+++ b/phaul/service.py
@@ -7,15 +7,17 @@ import images
 import criu_api
 import criu_req
 import htype
+import iters
 
 
 class phaul_service:
 	def __init__(self, connection):
 		self.connection = connection
-		self.criu_connection = None
-		self.img = None
 		self.htype = None
 		self.__fs_receiver = None
+		self.criu_connection = None
+		self.img = None
+		self.__mode = iters.MIGRATION_MODE_LIVE
 		self.dump_iter_index = 0
 		self.restored = False
 
@@ -28,9 +30,11 @@ class phaul_service:
 			self.criu_connection.close()
 
 		if self.htype and not self.restored:
-			self.htype.umount()
+			if iters.is_live_mode(self.__mode):
+				self.htype.umount()
+			elif iters.is_restart_mode(self.__mode):
+				self.htype.stop(True)
 
-		# Stop fs receiver if it is running
 		if self.__fs_receiver:
 			self.__fs_receiver.stop_receive()
 
@@ -40,22 +44,27 @@ class phaul_service:
 				self.img.save_images()
 			self.img.close()
 
-	def rpc_setup(self, htype_id):
+	def rpc_setup(self, htype_id, mode):
+
 		logging.info("Setting up service side %s", htype_id)
-		self.img = images.phaul_images("rst")
+		self.__mode = mode
 
-		self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
 		self.htype = htype.get_dst(htype_id)
 
-		# Create and start fs receiver if current p.haul module provide it
 		self.__fs_receiver = self.htype.get_fs_receiver(self.connection.fdfs)
 		if self.__fs_receiver:
 			self.__fs_receiver.start_receive()
 
+		if iters.is_live_mode(self.__mode):
+			self.img = images.phaul_images("rst")
+			self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
+
 	def rpc_set_options(self, opts):
-		self.criu_connection.set_options(opts)
-		self.img.set_options(opts)
 		self.htype.set_options(opts)
+		if self.criu_connection:
+			self.criu_connection.set_options(opts)
+		if self.img:
+			self.img.set_options(opts)
 
 	def start_page_server(self):
 		logging.info("Starting page server for iter %d", self.dump_iter_index)
@@ -100,3 +109,9 @@ class phaul_service:
 	def rpc_restore_time(self):
 		stats = criu_api.criu_get_rstats(self.img)
 		return stats.restore_time
+
+	def rpc_start_htype(self):
+		logging.info("Starting")
+		self.htype.start()
+		logging.info("Start succeeded")
+		self.restored = True
-- 
1.7.1



More information about the CRIU mailing list