[CRIU] [PATCH 11/11] p.haul: implement migration in so-called restart mode
Nikita Spiridonov
nspiridonov at virtuozzo.com
Mon Mar 21 09:35:26 PDT 2016
Implement restart migration mode which migrate fs to target host
iteratively while possible, stop process tree on source host and
start it on target host. C/r not used in this mode.
Signed-off-by: Nikita Spiridonov <nspiridonov at virtuozzo.com>
---
phaul/criu_api.py | 2 +-
phaul/iters.py | 115 ++++++++++++++++++++++++++++++++++++++++++++++------
phaul/service.py | 35 +++++++++++-----
3 files changed, 127 insertions(+), 25 deletions(-)
diff --git a/phaul/criu_api.py b/phaul/criu_api.py
index 2bfbf65..72ed256 100644
--- a/phaul/criu_api.py
+++ b/phaul/criu_api.py
@@ -31,7 +31,7 @@ class criu_conn:
self._shell_job = False
css = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
util.set_cloexec(css[1])
- logging.info("\t`- Passing (ctl:%d, data:%d) pair to CRIU", css[0].fileno(), mem_sk.fileno())
+ logging.info("Passing (ctl:%d, data:%d) pair to CRIU", css[0].fileno(), mem_sk.fileno())
self._swrk = subprocess.Popen([criu_binary, "swrk", "%d" % css[0].fileno()])
css[0].close()
self._cs = css[1]
diff --git a/phaul/iters.py b/phaul/iters.py
index 5916244..cbac565 100644
--- a/phaul/iters.py
+++ b/phaul/iters.py
@@ -22,6 +22,16 @@ PRE_DUMP_DISABLE = False
PRE_DUMP_ENABLE = True
+def is_live_mode(mode):
+ """Check is migration running in live mode"""
+ return mode == MIGRATION_MODE_LIVE
+
+
+def is_restart_mode(mode):
+ """Check is migration running in restart mode"""
+ return mode == MIGRATION_MODE_RESTART
+
+
class iter_consts:
"""Constants for iterations management"""
@@ -45,9 +55,6 @@ class phaul_iter_worker:
self.target_host = xem_rpc_client.rpc_proxy(self.connection.rpc_sk)
logging.info("Setting up local")
- self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
- self.img = images.phaul_images("dmp")
-
self.htype = htype.get_src(p_type)
if not self.htype:
raise Exception("No htype driver found")
@@ -56,8 +63,14 @@ class phaul_iter_worker:
if not self.fs:
raise Exception("No FS driver found")
+ self.img = None
+ self.criu_connection = None
+ if is_live_mode(self.__mode):
+ self.img = images.phaul_images("dmp")
+ self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
+
logging.info("Setting up remote")
- self.target_host.setup(p_type)
+ self.target_host.setup(p_type, mode)
def get_target_host(self):
return self.target_host
@@ -65,11 +78,13 @@ class phaul_iter_worker:
def set_options(self, opts):
self.__force = opts["force"]
self.__pre_dump = opts["pre_dump"]
- self.target_host.set_options(opts)
- self.criu_connection.set_options(opts)
- self.img.set_options(opts)
self.htype.set_options(opts)
self.fs.set_options(opts)
+ if self.img:
+ self.img.set_options(opts)
+ if self.criu_connection:
+ self.criu_connection.set_options(opts)
+ self.target_host.set_options(opts)
def __validate_cpu(self):
if self.__force:
@@ -127,9 +142,9 @@ class phaul_iter_worker:
def start_migration(self):
logging.info("Start migration in %s mode", self.__mode)
- if self.__mode == MIGRATION_MODE_LIVE:
+ if is_live_mode(self.__mode):
self.__start_live_migration()
- elif self.__mode == MIGRATION_MODE_RESTART:
+ elif is_restart_mode(self.__mode):
self.__start_restart_migration()
else:
raise Exception("Unknown migration mode")
@@ -196,9 +211,9 @@ class phaul_iter_worker:
# Restore htype on target
logging.info("Asking target host to restore")
self.target_host.restore_from_images()
+ logging.info("Restored on target host")
# Ack previous dump request to terminate all frozen tasks
- logging.info("Restored on target host")
resp = self.criu_connection.ack_notify()
if not resp.success:
raise Exception("Dump screwed up")
@@ -220,7 +235,53 @@ class phaul_iter_worker:
tree on source host and start it on target host.
"""
- raise Exception("Not implemented")
+ migration_stats = mstats.restart_stats()
+ migration_stats.handle_start()
+
+ # Handle preliminary FS migration
+ logging.info("Preliminary FS migration")
+ fsstats = self.fs.start_migration()
+ migration_stats.handle_preliminary(fsstats)
+
+ iter_index = 0
+ prev_fsstats = None
+
+ while True:
+
+ # Handle FS migration iteration
+ logging.info("* Iteration %d", iter_index)
+ fsstats = self.fs.next_iteration()
+ migration_stats.handle_iteration(fsstats)
+
+ # Decide whether we continue iteration or stop and do final sync
+ if not self.__check_restart_iter_progress(iter_index, fsstats, prev_fsstats):
+ break
+
+ iter_index += 1
+ prev_fsstats = fsstats
+
+ # Stop htype on source and leave it mounted
+ logging.info("Final stop and start")
+ self.htype.stop(False)
+
+ try:
+ # Handle final FS sync on mounted htype
+ logging.info("Final FS sync")
+ fsstats = self.fs.stop_migration()
+ migration_stats.handle_iteration(fsstats)
+
+ # Start htype on target
+ logging.info("Asking target host to start")
+ self.target_host.start_htype()
+ logging.info("Started on target host")
+
+ except:
+ self.htype.start()
+ raise
+
+ logging.info("Migration succeeded")
+ self.htype.umount()
+ migration_stats.handle_stop()
def __check_live_iter_progress(self, index, dstats, prev_dstats):
@@ -231,9 +292,9 @@ class phaul_iter_worker:
return False
if prev_dstats:
- w_add = dstats.pages_written - prev_dstats.pages_written
- w_add = w_add * 100 / prev_dstats.pages_written
- if w_add > iter_consts.MAX_ITER_GROW_RATE:
+ grow_rate = self.__calc_grow_rate(dstats.pages_written,
+ prev_dstats.pages_written)
+ if grow_rate > iter_consts.MAX_ITER_GROW_RATE:
logging.info("\t> Iteration grows")
return False
@@ -243,3 +304,29 @@ class phaul_iter_worker:
logging.info("\t> Proceed to next iteration")
return True
+
+ def __check_restart_iter_progress(self, index, fsstats, prev_fsstats):
+
+ logging.info("Checking iteration progress:")
+
+ if fsstats.bytes_xferred <= iter_consts.MIN_ITER_FS_XFER_BYTES:
+ logging.info("\t> Small fs transfer")
+ return False
+
+ if prev_fsstats:
+ grow_rate = self.__calc_grow_rate(fsstats.bytes_xferred,
+ prev_fsstats.bytes_xferred)
+ if grow_rate > iter_consts.MAX_ITER_GROW_RATE:
+ logging.info("\t> Iteration grows")
+ return False
+
+ if index >= iter_consts.MAX_ITERS_COUNT:
+ logging.info("\t> Too many iterations")
+ return False
+
+ logging.info("\t> Proceed to next iteration")
+ return True
+
+ def __calc_grow_rate(self, value, prev_value):
+ delta = value - prev_value
+ return delta * 100 / prev_value
diff --git a/phaul/service.py b/phaul/service.py
index 5a3a380..ecb0cc8 100644
--- a/phaul/service.py
+++ b/phaul/service.py
@@ -7,15 +7,17 @@ import images
import criu_api
import criu_req
import htype
+import iters
class phaul_service:
def __init__(self, connection):
self.connection = connection
- self.criu_connection = None
- self.img = None
self.htype = None
self.__fs_receiver = None
+ self.criu_connection = None
+ self.img = None
+ self.__mode = iters.MIGRATION_MODE_LIVE
self.dump_iter_index = 0
self.restored = False
@@ -28,9 +30,11 @@ class phaul_service:
self.criu_connection.close()
if self.htype and not self.restored:
- self.htype.umount()
+ if iters.is_live_mode(self.__mode):
+ self.htype.umount()
+ elif iters.is_restart_mode(self.__mode):
+ self.htype.stop(True)
- # Stop fs receiver if it is running
if self.__fs_receiver:
self.__fs_receiver.stop_receive()
@@ -40,22 +44,27 @@ class phaul_service:
self.img.save_images()
self.img.close()
- def rpc_setup(self, htype_id):
+ def rpc_setup(self, htype_id, mode):
+
logging.info("Setting up service side %s", htype_id)
- self.img = images.phaul_images("rst")
+ self.__mode = mode
- self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
self.htype = htype.get_dst(htype_id)
- # Create and start fs receiver if current p.haul module provide it
self.__fs_receiver = self.htype.get_fs_receiver(self.connection.fdfs)
if self.__fs_receiver:
self.__fs_receiver.start_receive()
+ if iters.is_live_mode(self.__mode):
+ self.img = images.phaul_images("rst")
+ self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
+
def rpc_set_options(self, opts):
- self.criu_connection.set_options(opts)
- self.img.set_options(opts)
self.htype.set_options(opts)
+ if self.criu_connection:
+ self.criu_connection.set_options(opts)
+ if self.img:
+ self.img.set_options(opts)
def start_page_server(self):
logging.info("Starting page server for iter %d", self.dump_iter_index)
@@ -100,3 +109,9 @@ class phaul_service:
def rpc_restore_time(self):
stats = criu_api.criu_get_rstats(self.img)
return stats.restore_time
+
+ def rpc_start_htype(self):
+ logging.info("Starting")
+ self.htype.start()
+ logging.info("Start succeeded")
+ self.restored = True
--
1.7.1
More information about the CRIU
mailing list