[CRIU] [PATCH 1/3] p.haul: move auxiliary stuff into phaul package
Ruslan Kuprieiev
kupruser at gmail.com
Sun Nov 2 17:02:49 PST 2014
Lets store all non-executable p.haul sources inside phaul package,
so it could be easily installed and used.
Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
---
.gitignore | 2 -
Makefile | 4 +-
criu_api.py | 109 ----------------------
fs_haul_shared.py | 28 ------
fs_haul_subtree.py | 50 ----------
images.py | 176 -----------------------------------
mstats.py | 33 -------
p.haul | 6 +-
p.haul-service | 4 +-
p_haul_cgroup.py | 79 ----------------
p_haul_iters.py | 232 -----------------------------------------------
p_haul_lxc.py | 166 ---------------------------------
p_haul_ovz.py | 190 --------------------------------------
p_haul_pid.py | 84 -----------------
p_haul_service.py | 154 -------------------------------
p_haul_type.py | 33 -------
phaul/.gitignore | 2 +
phaul/__init__.py | 17 ++++
phaul/criu_api.py | 109 ++++++++++++++++++++++
phaul/fs_haul_shared.py | 28 ++++++
phaul/fs_haul_subtree.py | 50 ++++++++++
phaul/images.py | 176 +++++++++++++++++++++++++++++++++++
phaul/mstats.py | 33 +++++++
phaul/p_haul_cgroup.py | 79 ++++++++++++++++
phaul/p_haul_iters.py | 232 +++++++++++++++++++++++++++++++++++++++++++++++
phaul/p_haul_lxc.py | 166 +++++++++++++++++++++++++++++++++
phaul/p_haul_ovz.py | 190 ++++++++++++++++++++++++++++++++++++++
phaul/p_haul_pid.py | 84 +++++++++++++++++
phaul/p_haul_service.py | 154 +++++++++++++++++++++++++++++++
phaul/p_haul_type.py | 33 +++++++
phaul/util.py | 46 ++++++++++
phaul/xem_rpc.py | 198 ++++++++++++++++++++++++++++++++++++++++
util.py | 46 ----------
xem_rpc.py | 198 ----------------------------------------
34 files changed, 1604 insertions(+), 1587 deletions(-)
delete mode 100644 criu_api.py
delete mode 100644 fs_haul_shared.py
delete mode 100644 fs_haul_subtree.py
delete mode 100644 images.py
delete mode 100644 mstats.py
delete mode 100644 p_haul_cgroup.py
delete mode 100644 p_haul_iters.py
delete mode 100644 p_haul_lxc.py
delete mode 100644 p_haul_ovz.py
delete mode 100644 p_haul_pid.py
delete mode 100644 p_haul_service.py
delete mode 100644 p_haul_type.py
create mode 100644 phaul/.gitignore
create mode 100644 phaul/__init__.py
create mode 100644 phaul/criu_api.py
create mode 100644 phaul/fs_haul_shared.py
create mode 100644 phaul/fs_haul_subtree.py
create mode 100644 phaul/images.py
create mode 100644 phaul/mstats.py
create mode 100644 phaul/p_haul_cgroup.py
create mode 100644 phaul/p_haul_iters.py
create mode 100644 phaul/p_haul_lxc.py
create mode 100644 phaul/p_haul_ovz.py
create mode 100644 phaul/p_haul_pid.py
create mode 100644 phaul/p_haul_service.py
create mode 100644 phaul/p_haul_type.py
create mode 100644 phaul/util.py
create mode 100644 phaul/xem_rpc.py
delete mode 100644 util.py
delete mode 100644 xem_rpc.py
diff --git a/.gitignore b/.gitignore
index 941b808..0d20b64 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1 @@
*.pyc
-rpc_pb2.py
-stats_pb2.py
diff --git a/Makefile b/Makefile
index 6f7bde5..4513658 100644
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,7 @@ all: rpc_pb2.py stats_pb2.py
.PHONY: all
rpc_pb2.py: rpc.proto
- protoc --proto_path=. --python_out=. rpc.proto
+ protoc --proto_path=. --python_out=./phaul rpc.proto
stats_pb2.py: stats.proto
- protoc --proto_path=. --python_out=. stats.proto
+ protoc --proto_path=. --python_out=./phaul stats.proto
diff --git a/criu_api.py b/criu_api.py
deleted file mode 100644
index 146088a..0000000
--- a/criu_api.py
+++ /dev/null
@@ -1,109 +0,0 @@
-#
-# CRIU API
-# Includes class to work with CRIU service and helpers
-#
-
-import socket
-import struct
-import os
-import util
-import subprocess
-import rpc_pb2 as cr_rpc
-import stats_pb2 as crs
-
-criu_binary = "criu"
-
-req_types = {
- cr_rpc.DUMP: "dump",
- cr_rpc.PRE_DUMP: "pre_dump",
- cr_rpc.PAGE_SERVER: "page_server",
- cr_rpc.RESTORE: "restore",
- cr_rpc.CPUINFO_DUMP: "cpuinfo-dump",
- cr_rpc.CPUINFO_CHECK: "cpuinfo-check",
-}
-
-cpuinfo_img_name = "cpuinfo.img"
-
-def_verb = 2
-
-#
-# Connection to CRIU service
-#
-
-class criu_conn:
- def __init__(self, mem_sk):
- self._iter = 0
- self.verb = def_verb
- css = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
- util.set_cloexec(css[1])
- print "`- Passing (ctl:%d, data:%d) pair to CRIU" % (css[0].fileno(), mem_sk.fileno())
- self._swrk = subprocess.Popen([criu_binary, "swrk", "%d" % css[0].fileno()])
- css[0].close()
- self._cs = css[1]
- self._last_req = -1
- self._mem_fd = mem_sk.fileno()
-
- def close(self):
- self._cs.close()
- self._swrk.wait()
-
- def mem_sk_fileno(self):
- return self._mem_fd
-
- def verbose(self, level):
- self.verb = level
-
- def _recv_resp(self):
- resp = cr_rpc.criu_resp()
- resp.ParseFromString(self._cs.recv(1024))
- if not resp.type in (cr_rpc.NOTIFY, self._last_req):
- raise Exception("CRIU RPC error (%d/%d)" % (resp.type, self._last_req))
-
- return resp
-
- def send_req(self, req):
- req.opts.log_level = self.verb
- req.opts.log_file = "criu_%s.%d.log" % (req_types[req.type], self._iter)
- self._cs.send(req.SerializeToString())
- self._iter += 1
- self._last_req = req.type
-
- return self._recv_resp()
-
- def ack_notify(self, success = True):
- req = cr_rpc.criu_req()
- req.type = cr_rpc.NOTIFY
- req.notify_success = True
- self._cs.send(req.SerializeToString())
-
- return self._recv_resp()
-
-#
-# Helper to read CRIU-generated statistics
-#
-
-CRIU_STATS_MAGIC = 0x57093306
-
-def criu_get_stats(img, file_name):
- s = struct.Struct("I I")
- f = open(os.path.join(img.work_dir(), file_name))
- #
- # Stats file is 4 butes of magic, then 4 bytes with
- # stats packet size
- #
- v = s.unpack(f.read(s.size))
- if v[0] != CRIU_STATS_MAGIC:
- raise Exception("Magic is %x, expect %x" % (v[0], CRIU_STATS_MAGIC))
-
- stats = crs.stats_entry()
- stats.ParseFromString(f.read(v[1]))
-
- return stats
-
-def criu_get_dstats(img):
- stats = criu_get_stats(img, "stats-dump")
- return stats.dump
-
-def criu_get_rstats(img):
- stats = criu_get_stats(img, "stats-restore")
- return stats.restore
diff --git a/fs_haul_shared.py b/fs_haul_shared.py
deleted file mode 100644
index 54e231a..0000000
--- a/fs_haul_shared.py
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Shared FS hauler (noop)
-#
-
-class p_haul_fs:
- def __init__(self):
- print "Initilized shared FS hauler"
- pass
-
- def set_target_host(self, thost):
- pass
-
- def set_work_dir(self, wdir):
- pass
-
- def start_migration(self):
- pass
-
- def next_iteration(self):
- pass
-
- def stop_migration(self):
- pass
-
- # Inode numbers do not change on this FS
- # during migration
- def persistent_inodes(self):
- return True
diff --git a/fs_haul_subtree.py b/fs_haul_subtree.py
deleted file mode 100644
index 3181c79..0000000
--- a/fs_haul_subtree.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# FS haul driver, that copies the subtree from
-# one node to another using rsync. It's used in
-# legacy OpenVZ configurations.
-#
-
-import subprocess as sp
-import os
-
-rsync_log_file = "rsync.log"
-
-class p_haul_fs:
- def __init__(self, subtree_path):
- print "Initialized subtree FS hauler (%s)" % subtree_path
- self.__root = subtree_path
- pass
-
- def set_target_host(self, thost):
- self.__thost = thost
-
- def set_work_dir(self, wdir):
- self.__wdir = wdir
-
- def __run_rsync(self):
- logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")
- dst = "%s:%s" % (self.__thost, os.path.dirname(self.__root))
-
- # First rsync might be very long. Wait for it not
- # to produce big pause between the 1st pre-dump and
- # .stop_migration
-
- ret = sp.call(["rsync", "-a", self.__root, dst],
- stdout = logf, stderr = logf)
- if ret != 0:
- raise Exception("Rsync failed")
-
- def start_migration(self):
- print "Starting FS migration"
- self.__run_rsync()
-
- def next_iteration(self):
- pass
-
- def stop_migration(self):
- print "Doing final FS sync"
- self.__run_rsync()
-
- # When rsync-ing FS inodes number will change
- def persistent_inodes(self):
- return False
diff --git a/images.py b/images.py
deleted file mode 100644
index a4d465c..0000000
--- a/images.py
+++ /dev/null
@@ -1,176 +0,0 @@
-#
-# images driver for migration (without FS transfer)
-#
-
-import os
-import tempfile
-import tarfile
-import time
-import shutil
-import time
-import threading
-import util
-import criu_api
-
-def_path = "/var/local/p.haul-fs/"
-
-class opendir:
- def __init__(self, path):
- self._dirname = path
- self._dirfd = os.open(path, os.O_DIRECTORY)
- util.set_cloexec(self)
-
- def close(self):
- os.close(self._dirfd)
- os._dirname = None
- os._dirfd = -1
-
- def name(self):
- return self._dirname
-
- def fileno(self):
- return self._dirfd
-
-class untar_thread(threading.Thread):
- def __init__(self, sk, tdir):
- threading.Thread.__init__(self)
- self.__sk = sk
- self.__dir = tdir
-
- def run(self):
- tf = tarfile.open(mode = "r|", fileobj = self.__sk.makefile())
- tf.extractall(self.__dir)
- tf.close()
-
-class img_tar:
- def __init__(self, sk, dirname):
- self.__tf = tarfile.open(mode = "w|", fileobj = sk.makefile())
- self.__dir = dirname
-
- def add(self, img, path = None):
- if not path:
- path = os.path.join(self.__dir, img)
-
- self.__tf.add(path, img)
-
- def close(self):
- self.__tf.close()
-
-class phaul_images:
- WDIR = 1
- IMGDIR = 2
-
- def __init__(self, typ):
- self.current_iter = 0
- self.sync_time = 0.0
- self._typ = typ
- self._keep_on_close = False
- self._wdir = None
- self._current_dir = None
-
- def save_images(self):
- print "Keeping images"
- self._keep_on_close = True
-
- def set_options(self, opts):
- self._keep_on_close = opts["keep_images"]
-
- suf = time.strftime("-%y.%m.%d-%H.%M", time.localtime())
- util.makedirs(opts["img_path"])
- wdir = tempfile.mkdtemp(suf, "%s-" % self._typ, opts["img_path"])
- self._wdir = opendir(wdir)
- self._img_path = os.path.join(self._wdir.name(), "img")
- os.mkdir(self._img_path)
-
- def close(self):
- if not self._wdir:
- return
-
- self._wdir.close()
- if self._current_dir:
- self._current_dir.close()
-
- if not self._keep_on_close:
- print "Removing images"
- shutil.rmtree(self._wdir.name())
- else:
- print "Images are kept in %s" % self._wdir.name()
- pass
-
- def img_sync_time(self):
- return self.sync_time
-
- def new_image_dir(self):
- if self._current_dir:
- self._current_dir.close()
- self.current_iter += 1
- img_dir = "%s/%d" % (self._img_path, self.current_iter)
- print "\tMaking directory %s" % img_dir
- os.mkdir(img_dir)
- self._current_dir = opendir(img_dir)
-
- def image_dir_fd(self):
- return self._current_dir.fileno()
-
- def work_dir_fd(self):
- return self._wdir.fileno()
-
- def image_dir(self):
- return self._current_dir.name()
-
- def work_dir(self):
- return self._wdir.name()
-
- def prev_image_dir(self):
- if self.current_iter == 1:
- return None
- else:
- return "../%d" % (self.current_iter - 1)
-
- # Images transfer
- # Are there better ways for doing this?
-
- def sync_imgs_to_target(self, th, htype, sock):
- # Pre-dump doesn't generate any images (yet?)
- # so copy only those from the top dir
- print "Sending images to target"
-
- start = time.time()
- cdir = self.image_dir()
-
- th.start_accept_images(phaul_images.IMGDIR)
- tf = img_tar(sock, cdir)
-
- print "\tPack"
- for img in filter(lambda x: x.endswith(".img"), os.listdir(cdir)):
- tf.add(img)
-
- print "\tAdd htype images"
- for himg in htype.get_meta_images(cdir):
- tf.add(himg[1], himg[0])
-
- tf.close()
- th.stop_accept_images()
-
- self.sync_time = time.time() - start
-
- def send_cpuinfo(self, th, sock):
- th.start_accept_images(phaul_images.WDIR)
- tf = img_tar(sock, self.work_dir())
- tf.add(criu_api.cpuinfo_img_name)
- tf.close()
- th.stop_accept_images()
-
- def start_accept_images(self, dir_id, sk):
- if dir_id == phaul_images.WDIR:
- dirname = self.work_dir()
- else:
- dirname = self.image_dir()
-
- self.__acc_tar = untar_thread(sk, dirname)
- self.__acc_tar.start()
- print "Started images server"
-
- def stop_accept_images(self):
- print "Waiting for images to unpack"
- self.__acc_tar.join()
diff --git a/mstats.py b/mstats.py
deleted file mode 100644
index f1a9c32..0000000
--- a/mstats.py
+++ /dev/null
@@ -1,33 +0,0 @@
-import time
-
-def usec2sec(usec):
- return usec / 1000000.
-
-class migration_stats:
- def __init__(self):
- self._iter_fr_times = []
- self._frozen_time = 0
-
- def start(self):
- self._start_time = time.time()
-
- def stop(self, iters):
- self._rst_time = iters.th.restore_time()
- self._img_sync_time = iters.img.img_sync_time()
- self._end_time = time.time()
-
- self._print_stats()
-
- def iteration(self, stats):
- print "Dumped %d pages, %d skipped" % \
- (stats.pages_written, stats.pages_skipped_parent)
-
- self._iter_fr_times.append("%.2lf" % usec2sec(stats.frozen_time))
- self._frozen_time += stats.frozen_time
-
- def _print_stats(self):
- print "Migration succeeded"
- print "\t total time is ~%.2lf sec" % (self._end_time - self._start_time)
- print "\t frozen time is ~%.2lf sec (" % usec2sec(self._frozen_time), self._iter_fr_times, ")"
- print "\t restore time is ~%.2lf sec" % usec2sec(self._rst_time)
- print "\timg sync time is ~%.2lf sec" % (self._img_sync_time)
diff --git a/p.haul b/p.haul
index 8c7f181..29b4c49 100755
--- a/p.haul
+++ b/p.haul
@@ -1,9 +1,9 @@
#!/usr/bin/env python
import sys
import argparse
-import p_haul_iters as ph_iters
-import images
-import criu_api
+from phaul import p_haul_iters as ph_iters
+from phaul import images
+from phaul import criu_api
# Usage idea
# p.haul <type> <id> <destination>
diff --git a/p.haul-service b/p.haul-service
index cdeff54..1f6eccb 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -1,8 +1,8 @@
#!/usr/bin/env python
import signal
-import xem_rpc
-import p_haul_service as ph_srv
+from phaul import xem_rpc
+from phaul import p_haul_service as ph_srv
if __name__ == "__main__":
sfd = None
diff --git a/p_haul_cgroup.py b/p_haul_cgroup.py
deleted file mode 100644
index 846ee62..0000000
--- a/p_haul_cgroup.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# CGroups manipulations for p.haul.
-#
-# FIXME Isn't it nicer to do it via libcgroup?
-#
-
-import os
-
-cg_root_dir = "/sys/fs/cgroup"
-cg_tasks_file = "tasks"
-
-def cg_line_parse(ln):
- items = ln.split(":")
- #
- # If two controllers are merged tigether, we see
- # their names comma-separated in proc. The problem
- # is that the respective directory name in sysfsis
- # (!) IS NOT THE SAME, controller names can go
- # reversed.
- #
- # That said, we just use the first name component,
- # in sysfs there would the respective symlink
- #
- cname = items[1].split(",")[0]
- cdir = items[2]
-
- return cname, cdir
-
-def dump_hier(pid, img):
- print "\tSave CG for %d into %s" % (pid, img)
- fd = open(img, "w")
- cg = open("/proc/%d/cgroup" % pid)
- for ln in cg:
- cg_controller, cg_dir = cg_line_parse(ln)
- if not cg_controller.startswith("name="):
- fd.write("%s%s" % (cg_controller, cg_dir))
-
- cg.close()
- fd.close()
-
-#
-# The cpuset controller is unusable before at least one
-# cpu and memory node is set there. For restore it's OK
-# to copy parent masks into it, at the end we'll apply
-# "real" CT config
-#
-
-def cpuset_copy_parent(path, c):
- c = "cpuset.%s" % c
- ppath = os.path.dirname(path)
- pfd = open(os.path.join(ppath, c))
- cfd = open(os.path.join(path, c), "w")
- cfd.write(pfd.read())
- cfd.close()
- pfd.close()
-
-def cpuset_allow_all(path):
- cpuset_copy_parent(path, "cpus")
- cpuset_copy_parent(path, "mems")
-
-def restore_one_controller(pid, ln):
- cg_path = os.path.join(cg_root_dir, ln.strip())
- print "[%s]" % cg_path
- if not os.access(cg_path, os.F_OK):
- os.makedirs(cg_path)
- if ln.startswith("cpuset"):
- cpuset_allow_all(cg_path)
-
- tf = open(os.path.join(cg_path, cg_tasks_file), "w")
- tf.write("%d" % pid)
- tf.close()
-
-def restore_hier(pid, img):
- print "\tCreate hier for %d from %s" % (pid, img)
- fd = open(img)
- for ln in fd:
- restore_one_controller(pid, ln)
- fd.close()
- pass
diff --git a/p_haul_iters.py b/p_haul_iters.py
deleted file mode 100644
index 0463e78..0000000
--- a/p_haul_iters.py
+++ /dev/null
@@ -1,232 +0,0 @@
-#
-# The P.HAUL core -- the class that drives migration
-#
-
-import images
-import mstats
-import xem_rpc
-import rpc_pb2 as cr_rpc
-import criu_api
-import p_haul_type
-
-# Constants for iterations management
-#
-# Maximum number of iterations
-phaul_iter_max = 8
-# If we dump less than this amount of pages we abort
-# iterations and go do the full dump
-phaul_iter_min_size = 64
-# Each iteration should dump less pages or at most
-# this % more than previous
-phaul_iter_grow_max = 10
-
-class phaul_iter_worker:
- def __init__(self, p_type, host):
- self._mstat = mstats.migration_stats()
- self.iteration = 0
- self.prev_stats = None
-
- print "Connecting to target host"
- self.th = xem_rpc.rpc_proxy(host)
- self.data_sk = self.th.open_socket("datask")
-
- print "Setting up local"
- self.img = images.phaul_images("dmp")
- self.criu = criu_api.criu_conn(self.data_sk)
- self.htype = p_haul_type.get_src(p_type)
- if not self.htype:
- raise Exception("No htype driver found")
-
- self.fs = self.htype.get_fs()
- if not self.fs:
- raise Exception("No FS driver found")
-
- self.pid = self.htype.root_task_pid()
- self.fs.set_target_host(host)
-
- print "Setting up remote"
- self.th.setup(p_type)
-
-
- def make_dump_req(self, typ):
- #
- # Prepare generic request for (pre)dump
- #
-
- req = cr_rpc.criu_req()
- req.type = typ
- req.opts.pid = self.pid
- req.opts.ps.fd = self.criu.mem_sk_fileno()
- req.opts.track_mem = True
-
- req.opts.images_dir_fd = self.img.image_dir_fd()
- req.opts.work_dir_fd = self.img.work_dir_fd()
- p_img = self.img.prev_image_dir()
- if p_img:
- req.opts.parent_img = p_img
- if not self.fs.persistent_inodes():
- req.opts.force_irmap = True
-
- return req
-
- def set_options(self, opts):
- self.th.set_options(opts)
- self.criu.verbose(opts["verbose"])
- self.img.set_options(opts)
- self.htype.set_options(opts)
- self.__force = opts["force"]
-
- def validate_cpu(self):
- print "Checking CPU compatibility"
-
- print " `- Dumping CPU info"
- req = cr_rpc.criu_req()
- req.type = cr_rpc.CPUINFO_DUMP
- req.opts.images_dir_fd = self.img.work_dir_fd()
- req.keep_open = True
- resp = self.criu.send_req(req)
- if not resp.success:
- raise Exception("Can't dump cpuinfo")
-
- print " `- Sending CPU info"
- self.img.send_cpuinfo(self.th, self.data_sk)
-
- print " `- Checking CPU info"
- if not self.th.check_cpuinfo():
- raise Exception("CPUs mismatch")
-
- def start_migration(self):
- self._mstat.start()
-
- if not self.__force:
- self.validate_cpu()
-
- print "Preliminary FS migration"
- self.fs.set_work_dir(self.img.work_dir())
- self.fs.start_migration()
-
- print "Starting iterations"
- cc = self.criu
-
- while True:
- print "* Iteration %d" % self.iteration
-
- self.th.start_iter()
- self.img.new_image_dir()
-
- print "\tIssuing pre-dump command to service"
-
- req = self.make_dump_req(cr_rpc.PRE_DUMP)
- resp = cc.send_req(req)
- if not resp.success:
- raise Exception("Pre-dump failed")
-
- print "\tPre-dump succeeded"
-
- self.th.end_iter()
-
- stats = criu_api.criu_get_dstats(self.img)
- self._mstat.iteration(stats)
-
- #
- # Need to decide whether we do next iteration
- # or stop on the existing and go do full dump
- # and restore
- #
-
- print "Checking iteration progress:"
-
- if stats.pages_written <= phaul_iter_min_size:
- print "\t> Small dump"
- break;
-
- if self.prev_stats:
- w_add = stats.pages_written - self.prev_stats.pages_written
- w_add = w_add * 100 / self.prev_stats.pages_written
- if w_add > phaul_iter_grow_max:
- print "\t> Iteration grows"
- break
-
- if self.iteration >= phaul_iter_max:
- print "\t> Too many iterations"
- break
-
- self.iteration += 1
- self.prev_stats = stats
- print "\t> Proceed to next iteration"
-
- self.fs.next_iteration()
-
- #
- # Finish with iterations -- do full dump, send images
- # to target host and restore from them there
- #
-
- print "Final dump and restore"
-
- self.th.start_iter()
- self.img.new_image_dir()
-
- print "\tIssuing dump command to service"
- req = self.make_dump_req(cr_rpc.DUMP)
- req.opts.notify_scripts = True
- req.opts.file_locks = True
- req.opts.evasive_devices = True
- req.opts.link_remap = True
- if self.htype.can_migrate_tcp():
- req.opts.tcp_established = True
-
- resp = cc.send_req(req)
- while True:
- if resp.type != cr_rpc.NOTIFY:
- raise Exception("Dump failed")
-
- if resp.notify.script == "post-dump":
- #
- # Dump is effectively over. Now CRIU
- # waits for us to do whatever we want
- # and keeps the tasks frozen.
- #
- break
-
- elif resp.notify.script == "network-lock":
- self.htype.net_lock()
- elif resp.notify.script == "network-unlock":
- self.htype.net_unlock()
-
- print "\t\tNotify (%s)" % resp.notify.script
- resp = cc.ack_notify()
-
- print "Dump complete"
- self.th.end_iter()
-
- #
- # Dump is complete -- go to target node,
- # restore them there and kill (if required)
- # tasks on source node
- #
-
- print "Final FS and images sync"
- self.fs.stop_migration()
- self.img.sync_imgs_to_target(self.th, self.htype, self.data_sk)
-
- print "Asking target host to restore"
- self.th.restore_from_images()
-
- #
- # Ack the notify after restore -- CRIU would
- # then terminate all tasks and send us back
- # DUMP/success message
- #
-
- resp = cc.ack_notify()
- if not resp.success:
- raise Exception("Dump screwed up")
-
- self.htype.umount()
-
- stats = criu_api.criu_get_dstats(self.img)
- self._mstat.iteration(stats)
- self._mstat.stop(self)
- self.img.close()
- cc.close()
diff --git a/p_haul_lxc.py b/p_haul_lxc.py
deleted file mode 100644
index 27df698..0000000
--- a/p_haul_lxc.py
+++ /dev/null
@@ -1,166 +0,0 @@
-#
-# LinuX Containers hauler module
-#
-
-import os
-import shutil
-import p_haul_cgroup
-import util
-import fs_haul_shared
-import fs_haul_subtree
-from subprocess import Popen, PIPE
-
-name = "lxc"
-lxc_dir = "/var/lib/lxc/"
-lxc_rootfs_dir = "/usr/lib64/lxc/rootfs"
-cg_image_name = "lxccg.img"
-
-class p_haul_type:
- def __init__(self, name):
- self._ctname = name
- #
- # This list would contain (v_in, v_out, v_br) tuples where
- # v_in is the name of veth device in CT
- # v_out is its peer on the host
- # v_bridge is the bridge to which thie veth is attached
- #
- self._veths = []
- self._cfg = {}
-
- def __load_ct_config(self):
- print "Loading config file from %s" % self.__ct_config()
-
- self._cfg = {}
- self._veths = []
-
- veth = None
-
- ifd = open(self.__ct_config())
- for line in ifd:
- if not ("=" in line):
- continue
- k, v = map(lambda a: a.strip(), line.split("=", 1))
- self._cfg[k] = v
-
- if k == "lxc.network.type":
- if v != "veth":
- raise Exception("Unsupported network device type: %s", v)
- if veth:
- self._veths.append(veth)
- veth = util.net_dev()
- elif k == "lxc.network.link":
- veth.link = v
- elif k == "lxc.network.name":
- veth.name = v
- elif k == "lxc.network.veth.pair":
- veth.pair = v
- if veth:
- self._veths.append(veth)
- ifd.close()
-
- def __apply_cg_config(self):
- print "Applying CT configs"
- # FIXME -- implement
- pass
-
- def init_src(self):
- self._fs_mounted = True
- self._bridged = True
- self.__load_ct_config()
-
- def init_dst(self):
- self._fs_mounted = False
- self._bridged = False
- self.__load_ct_config()
-
- def set_options(self, opts):
- pass
-
- def root_task_pid(self):
- pid = -1;
-
- pd = Popen(["lxc-info", "-n", self._ctname], stdout = PIPE)
- for l in pd.stdout:
- if l.startswith("PID:"):
- pid = int(l.split(":")[1])
- status = pd.wait()
- if status:
- raise Exception("lxc info -n %s failed: %d" %
- (self._ctname, status))
- if pid == -1:
- raise Exception("CT isn't running")
- return pid
-
- def __ct_rootfs(self):
- return self._cfg['lxc.rootfs']
-
- def __ct_root(self):
- return os.path.join(lxc_rootfs_dir, self._ctname)
-
- def __ct_config(self):
- return os.path.join(lxc_dir, self._ctname, "config")
-
- #
- # Meta-images for LXC -- container config and info about CGroups
- #
- def get_meta_images(self, dir):
- cg_img = os.path.join(dir, cg_image_name)
- p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
- cfg_name = self.__ct_config()
- return [ (cfg_name, "config"),
- (cg_img, cg_image_name) ]
-
- def put_meta_images(self, dir):
- print "Putting config file into %s" % lxc_dir
-
- shutil.copy(os.path.join(dir, "config"), self.__ct_config())
-
- # Keep this name, we'll need one in prepare_ct()
- self.cg_img = os.path.join(dir, cg_image_name)
-
- #
- # Create cgroup hierarchy and put root task into it
- # Hierarchy is unlimited, we will apply config limitations
- # in ->restored->__apply_cg_config later
- #
- def prepare_ct(self, pid):
- p_haul_cgroup.restore_hier(pid, self.cg_img)
-
- def mount(self):
- nroot = self.__ct_root()
- print "Mounting CT root to %s" % nroot
- if not os.access(nroot, os.F_OK):
- os.makedirs(nroot)
- os.system("mount --bind %s %s" % (self.__ct_rootfs(), nroot))
- self._fs_mounted = True
- return nroot
-
- def get_fs(self):
- return fs_haul_shared.p_haul_fs()
-
- def restored(self, pid):
- self.__apply_cg_config()
-
- def net_lock(self):
- for veth in self._veths:
- util.ifdown(veth.pair)
-
- def net_unlock(self):
- for veth in self._veths:
- util.ifup(veth.pair)
- if veth.link and not self._bridged:
- util.bridge_add(veth.pair, veth.link)
-
- def can_migrate_tcp(self):
- return True
-
- def umount(self):
- pass
-
- def veths(self):
- #
- # Caller wants to see list of tuples with [0] being name
- # in CT and [1] being name on host. Just return existing
- # tuples, the [2] with bridge name wouldn't hurt
- #
- return self._veths
diff --git a/p_haul_ovz.py b/p_haul_ovz.py
deleted file mode 100644
index 44ddbb5..0000000
--- a/p_haul_ovz.py
+++ /dev/null
@@ -1,190 +0,0 @@
-#
-# OpenVZ containers hauler module
-#
-
-import os
-import shutil
-import p_haul_cgroup
-import util
-import fs_haul_shared
-import fs_haul_subtree
-
-name = "ovz"
-vzpid_dir = "/var/lib/vzctl/vepid/"
-vz_dir = "/vz"
-vzpriv_dir = "%s/private" % vz_dir
-vzroot_dir = "%s/root" % vz_dir
-vz_conf_dir = "/etc/vz/conf/"
-vz_pidfiles = "/var/lib/vzctl/vepid/"
-cg_image_name = "ovzcg.img"
-
-class p_haul_type:
- def __init__(self, ctid):
- self._ctid = ctid
- #
- # This list would contain (v_in, v_out, v_br) tuples where
- # v_in is the name of veth device in CT
- # v_out is its peer on the host
- # v_bridge is the bridge to which thie veth is attached
- #
- self._veths = []
- self._cfg = []
-
- def __load_ct_config(self, path):
- print "Loading config file from %s" % path
- ifd = open(os.path.join(path, self.__ct_config()))
- for line in ifd:
- self._cfg.append(line)
-
- if line.startswith("NETIF="):
- #
- # Parse and keep veth pairs, later we will
- # equip restore request with this data and
- # will use it while (un)locking the network
- #
- v_in = None
- v_out = None
- v_bridge = None
- vs = line.strip().split("=", 1)[1].strip("\"")
- for parm in vs.split(","):
- pa = parm.split("=")
- if pa[0] == "ifname":
- v_in = pa[1]
- elif pa[0] == "host_ifname":
- v_out = pa[1]
- elif pa[0] == "bridge":
- v_bridge = pa[1]
-
- if v_in and v_out:
- print "\tCollect %s -> %s (%s) veth" % (v_in, v_out, v_bridge)
- veth = util.net_dev()
- veth.name = v_in
- veth.pair = v_out
- veth.link = v_bridge
- self._veths.append(veth)
-
- ifd.close()
-
- def __apply_cg_config(self):
- print "Applying CT configs"
- # FIXME -- implement
- pass
-
- def init_src(self):
- self._fs_mounted = True
- self._bridged = True
- self.__load_ct_config(vz_conf_dir)
-
- def init_dst(self):
- self._fs_mounted = False
- self._bridged = False
-
- def set_options(self, opts):
- pass
-
- def root_task_pid(self):
- pf = open(os.path.join(vzpid_dir, self._ctid))
- pid = pf.read()
- return int(pid)
-
- def __ct_priv(self):
- return "%s/%s" % (vzpriv_dir, self._ctid)
-
- def __ct_root(self):
- return "%s/%s" % (vzroot_dir, self._ctid)
-
- def __ct_config(self):
- return "%s.conf" % self._ctid
-
- #
- # Meta-images for OVZ -- container config and info about CGroups
- #
- def get_meta_images(self, path):
- cg_img = os.path.join(path, cg_image_name)
- p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
- cfg_name = self.__ct_config()
- return [ (os.path.join(vz_conf_dir, cfg_name), cfg_name), \
- (cg_img, cg_image_name) ]
-
- def put_meta_images(self, path):
- print "Putting config file into %s" % vz_conf_dir
-
- self.__load_ct_config(path)
- ofd = open(os.path.join(vz_conf_dir, self.__ct_config()), "w")
- ofd.writelines(self._cfg)
- ofd.close()
-
- # Keep this name, we'll need one in prepare_ct()
- self.cg_img = os.path.join(path, cg_image_name)
-
- #
- # Create cgroup hierarchy and put root task into it
- # Hierarchy is unlimited, we will apply config limitations
- # in ->restored->__apply_cg_config later
- #
- def prepare_ct(self, pid):
- p_haul_cgroup.restore_hier(pid, self.cg_img)
-
- def __umount_root(self):
- print "Umounting CT root"
- os.system("umount %s" % self.__ct_root())
- self._fs_mounted = False
-
- def mount(self):
- nroot = self.__ct_root()
- print "Mounting CT root to %s" % nroot
- if not os.access(nroot, os.F_OK):
- os.makedirs(nroot)
- os.system("mount --bind %s %s" % (self.__ct_priv(), nroot))
- self._fs_mounted = True
- return nroot
-
- def umount(self):
- if self._fs_mounted:
- self.__umount_root()
-
- def get_fs(self):
- rootfs = util.path_to_fs(self.__ct_priv())
- if not rootfs:
- print "CT is on unknown FS"
- return None
-
- print "CT is on %s" % rootfs
-
- if rootfs == "nfs":
- return fs_haul_shared.p_haul_fs()
- if rootfs == "ext3" or rootfs == "ext4":
- return fs_haul_subtree.p_haul_fs(self.__ct_priv())
-
- print "Unknown CT FS"
- return None
-
- def restored(self, pid):
- print "Writing pidfile"
- pidfile = open(os.path.join(vz_pidfiles, self._ctid), 'w')
- pidfile.write("%d" % pid)
- pidfile.close()
-
- self.__apply_cg_config()
-
- def net_lock(self):
- for veth in self._veths:
- util.ifdown(veth.pair)
-
- def net_unlock(self):
- for veth in self._veths:
- util.ifup(veth.pair)
- if veth.link and not self._bridged:
- util.bridge_add(veth.pair, veth.link)
-
- def can_migrate_tcp(self):
- return True
-
- def veths(self):
- #
- # Caller wants to see list of tuples with [0] being name
- # in CT and [1] being name on host. Just return existing
- # tuples, the [2] with bridge name wouldn't hurt
- #
- return self._veths
-
diff --git a/p_haul_pid.py b/p_haul_pid.py
deleted file mode 100644
index f0051b0..0000000
--- a/p_haul_pid.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Individual process hauler
-#
-
-import fs_haul_shared
-
-name = "pid"
-
-class p_haul_type:
- def __init__(self, id):
- self.pid = int(id)
- self._pidfile = None
-
-
- #
- # Initialize itself for source node or destination one
- #
- def init_src(self):
- pass
- def init_dst(self):
- pass
-
- def set_options(self, opts):
- self._pidfile = opts["dst_rpid"]
- self._fs_root = opts["pid_root"]
-
- # Report the pid of the root task of what we're
- # goung to migrate
- def root_task_pid(self):
- return self.pid
-
- # Prepare filesystem before restoring. Retrun
- # the new root, if required. 'None' will mean
- # that things will get restored in the current
- # mount namespace and w/o chroot
- def mount(self):
- return self._fs_root
-
- # Remove any specific FS setup
- def umount(self):
- pass
-
- # Get driver for FS migration
- def get_fs(self):
- return fs_haul_shared.p_haul_fs()
-
- # Get list of files which should be copied to
- # the destination node. The dir argument is where
- # temporary stuff can be put
- def get_meta_images(self, dir):
- return []
-
- # Take your files from dir and put in whatever
- # places are appropriate. Paths (relative) are
- # preserved.
- def put_meta_images(self, dir):
- pass
-
- # Things are started to get restored, only the
- # first task (with pid @pid) is created.
- def prepare_ct(self, pid):
- pass
-
- # Restoring done, the new top task has pid pid
- def restored(self, pid):
- if self._pidfile:
- print "Writing rst pidfile"
- open(self._pidfile, "w").writelines(["%d" % pid])
-
- #
- # Lock and unlock networking
- #
- def net_lock(self):
- pass
-
- def net_unlock(self):
- pass
-
- def can_migrate_tcp(self):
- return False
-
- # Get list of veth pairs if any
- def veths(self):
- return []
diff --git a/p_haul_service.py b/p_haul_service.py
deleted file mode 100644
index 5587bd7..0000000
--- a/p_haul_service.py
+++ /dev/null
@@ -1,154 +0,0 @@
-#
-# P.HAUL code, that helps on the target node (rpyc service)
-#
-
-import xem_rpc
-import rpc_pb2 as cr_rpc
-import images
-import criu_api
-import p_haul_type
-
-class phaul_service:
- def on_connect(self):
- print "Connected"
- self.dump_iter = 0
- self.restored = False
- self.criu = None
- self.data_sk = None
- self.img = None
- self.htype = None
-
- def on_disconnect(self):
- print "Disconnected"
- if self.criu:
- self.criu.close()
-
- if self.data_sk:
- self.data_sk.close()
-
- if self.htype and not self.restored:
- self.htype.umount()
-
- if self.img:
- print "Closing images"
- if not self.restored:
- self.img.save_images()
- self.img.close()
-
- def on_socket_open(self, sk, uname):
- self.data_sk = sk
- print "Data socket (%s) accepted" % uname
-
- def rpc_setup(self, htype_id):
- print "Setting up service side", htype_id
- self.img = images.phaul_images("rst")
- self.criu = criu_api.criu_conn(self.data_sk)
- self.htype = p_haul_type.get_dst(htype_id)
-
- def rpc_set_options(self, opts):
- self.criu.verbose(opts["verbose"])
- self.img.set_options(opts)
- self.htype.set_options(opts)
-
- def start_page_server(self):
- print "Starting page server for iter %d" % self.dump_iter
-
- req = cr_rpc.criu_req()
- req.type = cr_rpc.PAGE_SERVER
- req.keep_open = True
- req.opts.ps.fd = self.criu.mem_sk_fileno()
-
- req.opts.images_dir_fd = self.img.image_dir_fd()
- req.opts.work_dir_fd = self.img.work_dir_fd()
- p_img = self.img.prev_image_dir()
- if p_img:
- req.opts.parent_img = p_img
-
- print "\tSending criu rpc req"
- resp = self.criu.send_req(req)
- if not resp.success:
- raise Exception("Failed to start page server")
-
- print "\tPage server started at %d" % resp.ps.pid
-
- def rpc_start_iter(self):
- self.dump_iter += 1
- self.img.new_image_dir()
- self.start_page_server()
-
- def rpc_end_iter(self):
- pass
-
- def rpc_start_accept_images(self, dir_id):
- self.img.start_accept_images(dir_id, self.data_sk)
-
- def rpc_stop_accept_images(self):
- self.img.stop_accept_images()
-
- def rpc_check_cpuinfo(self):
- print "Checking cpuinfo"
- req = cr_rpc.criu_req()
- req.type = cr_rpc.CPUINFO_CHECK
- req.opts.images_dir_fd = self.img.work_dir_fd()
- req.keep_open = True
- resp = self.criu.send_req(req)
- print " `-", resp.success
- return resp.success
-
- def rpc_restore_from_images(self):
- print "Restoring from images"
- self.htype.put_meta_images(self.img.image_dir())
-
- req = cr_rpc.criu_req()
- req.type = cr_rpc.RESTORE
- req.opts.images_dir_fd = self.img.image_dir_fd()
- req.opts.work_dir_fd = self.img.work_dir_fd()
- req.opts.notify_scripts = True
-
- if self.htype.can_migrate_tcp():
- req.opts.tcp_established = True
-
- for veth in self.htype.veths():
- v = req.opts.veths.add()
- v.if_in = veth.name
- v.if_out = veth.pair
-
- nroot = self.htype.mount()
- if nroot:
- req.opts.root = nroot
- print "Restore root set to %s" % req.opts.root
-
- cc = self.criu
- resp = cc.send_req(req)
- while True:
- if resp.type == cr_rpc.NOTIFY:
- print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
- if resp.notify.script == "setup-namespaces":
- #
- # At that point we have only one task
- # living in namespaces and waiting for
- # us to ACK the notify. Htype might want
- # to configure namespace (external net
- # devices) and cgroups
- #
- self.htype.prepare_ct(resp.notify.pid)
- elif resp.notify.script == "network-unlock":
- self.htype.net_unlock()
- elif resp.notify.script == "network-lock":
- raise Exception("Locking network on restore?")
-
- resp = cc.ack_notify()
- continue
-
- if not resp.success:
- raise Exception("Restore failed")
-
- print "Restore succeeded"
- break
-
- self.htype.restored(resp.restore.pid)
- self.restored = True
-
- def rpc_restore_time(self):
- stats = criu_api.criu_get_rstats(self.img)
- return stats.restore_time
diff --git a/p_haul_type.py b/p_haul_type.py
deleted file mode 100644
index 6ac093d..0000000
--- a/p_haul_type.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Haulers' type selector
-# Types (htypes) are classes, that help hauling processes.
-# See p_haul_pid for comments of how a class should look like.
-#
-
-import p_haul_ovz
-import p_haul_pid
-import p_haul_lxc
-
-haul_types = {
- p_haul_ovz.name: p_haul_ovz,
- p_haul_pid.name: p_haul_pid,
- p_haul_lxc.name: p_haul_lxc,
-}
-
-def __get(id):
- if haul_types.has_key(id[0]):
- h_type = haul_types[id[0]]
- return h_type.p_haul_type(id[1])
- else:
- print "Unknown type. Try one of", haul_types.keys()
- return None
-
-def get_src(id):
- ht = __get(id)
- ht.init_src()
- return ht
-
-def get_dst(id):
- ht = __get(id)
- ht.init_dst()
- return ht
diff --git a/phaul/.gitignore b/phaul/.gitignore
new file mode 100644
index 0000000..c8c86ef
--- /dev/null
+++ b/phaul/.gitignore
@@ -0,0 +1,2 @@
+*.pyc
+*_pb2.py
diff --git a/phaul/__init__.py b/phaul/__init__.py
new file mode 100644
index 0000000..2ed0820
--- /dev/null
+++ b/phaul/__init__.py
@@ -0,0 +1,17 @@
+__all__ = [
+ "criu_api",
+ "fs_haul_shared",
+ "fs_haul_subtree",
+ "images",
+ "mstats",
+ "p_haul_cgroup",
+ "p_haul_iters",
+ "p_haul_lxc",
+ "p_haul_ovz",
+ "p_haul_pid",
+ "p_haul_service",
+ "p_haul_type",
+ "rpc_pb2",
+ "stats_pb2",
+ "util",
+ "xem_rpc"]
diff --git a/phaul/criu_api.py b/phaul/criu_api.py
new file mode 100644
index 0000000..146088a
--- /dev/null
+++ b/phaul/criu_api.py
@@ -0,0 +1,109 @@
+#
+# CRIU API
+# Includes class to work with CRIU service and helpers
+#
+
+import socket
+import struct
+import os
+import util
+import subprocess
+import rpc_pb2 as cr_rpc
+import stats_pb2 as crs
+
+criu_binary = "criu"
+
+req_types = {
+ cr_rpc.DUMP: "dump",
+ cr_rpc.PRE_DUMP: "pre_dump",
+ cr_rpc.PAGE_SERVER: "page_server",
+ cr_rpc.RESTORE: "restore",
+ cr_rpc.CPUINFO_DUMP: "cpuinfo-dump",
+ cr_rpc.CPUINFO_CHECK: "cpuinfo-check",
+}
+
+cpuinfo_img_name = "cpuinfo.img"
+
+def_verb = 2
+
+#
+# Connection to CRIU service
+#
+
+class criu_conn:
+ def __init__(self, mem_sk):
+ self._iter = 0
+ self.verb = def_verb
+ css = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ util.set_cloexec(css[1])
+ print "`- Passing (ctl:%d, data:%d) pair to CRIU" % (css[0].fileno(), mem_sk.fileno())
+ self._swrk = subprocess.Popen([criu_binary, "swrk", "%d" % css[0].fileno()])
+ css[0].close()
+ self._cs = css[1]
+ self._last_req = -1
+ self._mem_fd = mem_sk.fileno()
+
+ def close(self):
+ self._cs.close()
+ self._swrk.wait()
+
+ def mem_sk_fileno(self):
+ return self._mem_fd
+
+ def verbose(self, level):
+ self.verb = level
+
+ def _recv_resp(self):
+ resp = cr_rpc.criu_resp()
+ resp.ParseFromString(self._cs.recv(1024))
+ if not resp.type in (cr_rpc.NOTIFY, self._last_req):
+ raise Exception("CRIU RPC error (%d/%d)" % (resp.type, self._last_req))
+
+ return resp
+
+ def send_req(self, req):
+ req.opts.log_level = self.verb
+ req.opts.log_file = "criu_%s.%d.log" % (req_types[req.type], self._iter)
+ self._cs.send(req.SerializeToString())
+ self._iter += 1
+ self._last_req = req.type
+
+ return self._recv_resp()
+
+ def ack_notify(self, success = True):
+ req = cr_rpc.criu_req()
+ req.type = cr_rpc.NOTIFY
+ req.notify_success = True
+ self._cs.send(req.SerializeToString())
+
+ return self._recv_resp()
+
+#
+# Helper to read CRIU-generated statistics
+#
+
+CRIU_STATS_MAGIC = 0x57093306
+
+def criu_get_stats(img, file_name):
+ s = struct.Struct("I I")
+ f = open(os.path.join(img.work_dir(), file_name))
+ #
+ # Stats file is 4 butes of magic, then 4 bytes with
+ # stats packet size
+ #
+ v = s.unpack(f.read(s.size))
+ if v[0] != CRIU_STATS_MAGIC:
+ raise Exception("Magic is %x, expect %x" % (v[0], CRIU_STATS_MAGIC))
+
+ stats = crs.stats_entry()
+ stats.ParseFromString(f.read(v[1]))
+
+ return stats
+
+def criu_get_dstats(img):
+ stats = criu_get_stats(img, "stats-dump")
+ return stats.dump
+
+def criu_get_rstats(img):
+ stats = criu_get_stats(img, "stats-restore")
+ return stats.restore
diff --git a/phaul/fs_haul_shared.py b/phaul/fs_haul_shared.py
new file mode 100644
index 0000000..54e231a
--- /dev/null
+++ b/phaul/fs_haul_shared.py
@@ -0,0 +1,28 @@
+#
+# Shared FS hauler (noop)
+#
+
+class p_haul_fs:
+ def __init__(self):
+ print "Initilized shared FS hauler"
+ pass
+
+ def set_target_host(self, thost):
+ pass
+
+ def set_work_dir(self, wdir):
+ pass
+
+ def start_migration(self):
+ pass
+
+ def next_iteration(self):
+ pass
+
+ def stop_migration(self):
+ pass
+
+ # Inode numbers do not change on this FS
+ # during migration
+ def persistent_inodes(self):
+ return True
diff --git a/phaul/fs_haul_subtree.py b/phaul/fs_haul_subtree.py
new file mode 100644
index 0000000..3181c79
--- /dev/null
+++ b/phaul/fs_haul_subtree.py
@@ -0,0 +1,50 @@
+#
+# FS haul driver, that copies the subtree from
+# one node to another using rsync. It's used in
+# legacy OpenVZ configurations.
+#
+
+import subprocess as sp
+import os
+
+rsync_log_file = "rsync.log"
+
+class p_haul_fs:
+ def __init__(self, subtree_path):
+ print "Initialized subtree FS hauler (%s)" % subtree_path
+ self.__root = subtree_path
+ pass
+
+ def set_target_host(self, thost):
+ self.__thost = thost
+
+ def set_work_dir(self, wdir):
+ self.__wdir = wdir
+
+ def __run_rsync(self):
+ logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")
+ dst = "%s:%s" % (self.__thost, os.path.dirname(self.__root))
+
+ # First rsync might be very long. Wait for it not
+ # to produce big pause between the 1st pre-dump and
+ # .stop_migration
+
+ ret = sp.call(["rsync", "-a", self.__root, dst],
+ stdout = logf, stderr = logf)
+ if ret != 0:
+ raise Exception("Rsync failed")
+
+ def start_migration(self):
+ print "Starting FS migration"
+ self.__run_rsync()
+
+ def next_iteration(self):
+ pass
+
+ def stop_migration(self):
+ print "Doing final FS sync"
+ self.__run_rsync()
+
+ # When rsync-ing FS inodes number will change
+ def persistent_inodes(self):
+ return False
diff --git a/phaul/images.py b/phaul/images.py
new file mode 100644
index 0000000..a4d465c
--- /dev/null
+++ b/phaul/images.py
@@ -0,0 +1,176 @@
+#
+# images driver for migration (without FS transfer)
+#
+
+import os
+import tempfile
+import tarfile
+import time
+import shutil
+import time
+import threading
+import util
+import criu_api
+
+def_path = "/var/local/p.haul-fs/"
+
+class opendir:
+ def __init__(self, path):
+ self._dirname = path
+ self._dirfd = os.open(path, os.O_DIRECTORY)
+ util.set_cloexec(self)
+
+ def close(self):
+ os.close(self._dirfd)
+ os._dirname = None
+ os._dirfd = -1
+
+ def name(self):
+ return self._dirname
+
+ def fileno(self):
+ return self._dirfd
+
+class untar_thread(threading.Thread):
+ def __init__(self, sk, tdir):
+ threading.Thread.__init__(self)
+ self.__sk = sk
+ self.__dir = tdir
+
+ def run(self):
+ tf = tarfile.open(mode = "r|", fileobj = self.__sk.makefile())
+ tf.extractall(self.__dir)
+ tf.close()
+
+class img_tar:
+ def __init__(self, sk, dirname):
+ self.__tf = tarfile.open(mode = "w|", fileobj = sk.makefile())
+ self.__dir = dirname
+
+ def add(self, img, path = None):
+ if not path:
+ path = os.path.join(self.__dir, img)
+
+ self.__tf.add(path, img)
+
+ def close(self):
+ self.__tf.close()
+
+class phaul_images:
+ WDIR = 1
+ IMGDIR = 2
+
+ def __init__(self, typ):
+ self.current_iter = 0
+ self.sync_time = 0.0
+ self._typ = typ
+ self._keep_on_close = False
+ self._wdir = None
+ self._current_dir = None
+
+ def save_images(self):
+ print "Keeping images"
+ self._keep_on_close = True
+
+ def set_options(self, opts):
+ self._keep_on_close = opts["keep_images"]
+
+ suf = time.strftime("-%y.%m.%d-%H.%M", time.localtime())
+ util.makedirs(opts["img_path"])
+ wdir = tempfile.mkdtemp(suf, "%s-" % self._typ, opts["img_path"])
+ self._wdir = opendir(wdir)
+ self._img_path = os.path.join(self._wdir.name(), "img")
+ os.mkdir(self._img_path)
+
+ def close(self):
+ if not self._wdir:
+ return
+
+ self._wdir.close()
+ if self._current_dir:
+ self._current_dir.close()
+
+ if not self._keep_on_close:
+ print "Removing images"
+ shutil.rmtree(self._wdir.name())
+ else:
+ print "Images are kept in %s" % self._wdir.name()
+ pass
+
+ def img_sync_time(self):
+ return self.sync_time
+
+ def new_image_dir(self):
+ if self._current_dir:
+ self._current_dir.close()
+ self.current_iter += 1
+ img_dir = "%s/%d" % (self._img_path, self.current_iter)
+ print "\tMaking directory %s" % img_dir
+ os.mkdir(img_dir)
+ self._current_dir = opendir(img_dir)
+
+ def image_dir_fd(self):
+ return self._current_dir.fileno()
+
+ def work_dir_fd(self):
+ return self._wdir.fileno()
+
+ def image_dir(self):
+ return self._current_dir.name()
+
+ def work_dir(self):
+ return self._wdir.name()
+
+ def prev_image_dir(self):
+ if self.current_iter == 1:
+ return None
+ else:
+ return "../%d" % (self.current_iter - 1)
+
+ # Images transfer
+ # Are there better ways for doing this?
+
+ def sync_imgs_to_target(self, th, htype, sock):
+ # Pre-dump doesn't generate any images (yet?)
+ # so copy only those from the top dir
+ print "Sending images to target"
+
+ start = time.time()
+ cdir = self.image_dir()
+
+ th.start_accept_images(phaul_images.IMGDIR)
+ tf = img_tar(sock, cdir)
+
+ print "\tPack"
+ for img in filter(lambda x: x.endswith(".img"), os.listdir(cdir)):
+ tf.add(img)
+
+ print "\tAdd htype images"
+ for himg in htype.get_meta_images(cdir):
+ tf.add(himg[1], himg[0])
+
+ tf.close()
+ th.stop_accept_images()
+
+ self.sync_time = time.time() - start
+
+ def send_cpuinfo(self, th, sock):
+ th.start_accept_images(phaul_images.WDIR)
+ tf = img_tar(sock, self.work_dir())
+ tf.add(criu_api.cpuinfo_img_name)
+ tf.close()
+ th.stop_accept_images()
+
+ def start_accept_images(self, dir_id, sk):
+ if dir_id == phaul_images.WDIR:
+ dirname = self.work_dir()
+ else:
+ dirname = self.image_dir()
+
+ self.__acc_tar = untar_thread(sk, dirname)
+ self.__acc_tar.start()
+ print "Started images server"
+
+ def stop_accept_images(self):
+ print "Waiting for images to unpack"
+ self.__acc_tar.join()
diff --git a/phaul/mstats.py b/phaul/mstats.py
new file mode 100644
index 0000000..f1a9c32
--- /dev/null
+++ b/phaul/mstats.py
@@ -0,0 +1,33 @@
+import time
+
+def usec2sec(usec):
+ return usec / 1000000.
+
+class migration_stats:
+ def __init__(self):
+ self._iter_fr_times = []
+ self._frozen_time = 0
+
+ def start(self):
+ self._start_time = time.time()
+
+ def stop(self, iters):
+ self._rst_time = iters.th.restore_time()
+ self._img_sync_time = iters.img.img_sync_time()
+ self._end_time = time.time()
+
+ self._print_stats()
+
+ def iteration(self, stats):
+ print "Dumped %d pages, %d skipped" % \
+ (stats.pages_written, stats.pages_skipped_parent)
+
+ self._iter_fr_times.append("%.2lf" % usec2sec(stats.frozen_time))
+ self._frozen_time += stats.frozen_time
+
+ def _print_stats(self):
+ print "Migration succeeded"
+ print "\t total time is ~%.2lf sec" % (self._end_time - self._start_time)
+ print "\t frozen time is ~%.2lf sec (" % usec2sec(self._frozen_time), self._iter_fr_times, ")"
+ print "\t restore time is ~%.2lf sec" % usec2sec(self._rst_time)
+ print "\timg sync time is ~%.2lf sec" % (self._img_sync_time)
diff --git a/phaul/p_haul_cgroup.py b/phaul/p_haul_cgroup.py
new file mode 100644
index 0000000..846ee62
--- /dev/null
+++ b/phaul/p_haul_cgroup.py
@@ -0,0 +1,79 @@
+#
+# CGroups manipulations for p.haul.
+#
+# FIXME Isn't it nicer to do it via libcgroup?
+#
+
+import os
+
+cg_root_dir = "/sys/fs/cgroup"
+cg_tasks_file = "tasks"
+
+def cg_line_parse(ln):
+ items = ln.split(":")
+ #
+ # If two controllers are merged tigether, we see
+ # their names comma-separated in proc. The problem
+ # is that the respective directory name in sysfsis
+ # (!) IS NOT THE SAME, controller names can go
+ # reversed.
+ #
+ # That said, we just use the first name component,
+ # in sysfs there would the respective symlink
+ #
+ cname = items[1].split(",")[0]
+ cdir = items[2]
+
+ return cname, cdir
+
+def dump_hier(pid, img):
+ print "\tSave CG for %d into %s" % (pid, img)
+ fd = open(img, "w")
+ cg = open("/proc/%d/cgroup" % pid)
+ for ln in cg:
+ cg_controller, cg_dir = cg_line_parse(ln)
+ if not cg_controller.startswith("name="):
+ fd.write("%s%s" % (cg_controller, cg_dir))
+
+ cg.close()
+ fd.close()
+
+#
+# The cpuset controller is unusable before at least one
+# cpu and memory node is set there. For restore it's OK
+# to copy parent masks into it, at the end we'll apply
+# "real" CT config
+#
+
+def cpuset_copy_parent(path, c):
+ c = "cpuset.%s" % c
+ ppath = os.path.dirname(path)
+ pfd = open(os.path.join(ppath, c))
+ cfd = open(os.path.join(path, c), "w")
+ cfd.write(pfd.read())
+ cfd.close()
+ pfd.close()
+
+def cpuset_allow_all(path):
+ cpuset_copy_parent(path, "cpus")
+ cpuset_copy_parent(path, "mems")
+
+def restore_one_controller(pid, ln):
+ cg_path = os.path.join(cg_root_dir, ln.strip())
+ print "[%s]" % cg_path
+ if not os.access(cg_path, os.F_OK):
+ os.makedirs(cg_path)
+ if ln.startswith("cpuset"):
+ cpuset_allow_all(cg_path)
+
+ tf = open(os.path.join(cg_path, cg_tasks_file), "w")
+ tf.write("%d" % pid)
+ tf.close()
+
+def restore_hier(pid, img):
+ print "\tCreate hier for %d from %s" % (pid, img)
+ fd = open(img)
+ for ln in fd:
+ restore_one_controller(pid, ln)
+ fd.close()
+ pass
diff --git a/phaul/p_haul_iters.py b/phaul/p_haul_iters.py
new file mode 100644
index 0000000..0463e78
--- /dev/null
+++ b/phaul/p_haul_iters.py
@@ -0,0 +1,232 @@
+#
+# The P.HAUL core -- the class that drives migration
+#
+
+import images
+import mstats
+import xem_rpc
+import rpc_pb2 as cr_rpc
+import criu_api
+import p_haul_type
+
+# Constants for iterations management
+#
+# Maximum number of iterations
+phaul_iter_max = 8
+# If we dump less than this amount of pages we abort
+# iterations and go do the full dump
+phaul_iter_min_size = 64
+# Each iteration should dump less pages or at most
+# this % more than previous
+phaul_iter_grow_max = 10
+
+class phaul_iter_worker:
+ def __init__(self, p_type, host):
+ self._mstat = mstats.migration_stats()
+ self.iteration = 0
+ self.prev_stats = None
+
+ print "Connecting to target host"
+ self.th = xem_rpc.rpc_proxy(host)
+ self.data_sk = self.th.open_socket("datask")
+
+ print "Setting up local"
+ self.img = images.phaul_images("dmp")
+ self.criu = criu_api.criu_conn(self.data_sk)
+ self.htype = p_haul_type.get_src(p_type)
+ if not self.htype:
+ raise Exception("No htype driver found")
+
+ self.fs = self.htype.get_fs()
+ if not self.fs:
+ raise Exception("No FS driver found")
+
+ self.pid = self.htype.root_task_pid()
+ self.fs.set_target_host(host)
+
+ print "Setting up remote"
+ self.th.setup(p_type)
+
+
+ def make_dump_req(self, typ):
+ #
+ # Prepare generic request for (pre)dump
+ #
+
+ req = cr_rpc.criu_req()
+ req.type = typ
+ req.opts.pid = self.pid
+ req.opts.ps.fd = self.criu.mem_sk_fileno()
+ req.opts.track_mem = True
+
+ req.opts.images_dir_fd = self.img.image_dir_fd()
+ req.opts.work_dir_fd = self.img.work_dir_fd()
+ p_img = self.img.prev_image_dir()
+ if p_img:
+ req.opts.parent_img = p_img
+ if not self.fs.persistent_inodes():
+ req.opts.force_irmap = True
+
+ return req
+
+ def set_options(self, opts):
+ self.th.set_options(opts)
+ self.criu.verbose(opts["verbose"])
+ self.img.set_options(opts)
+ self.htype.set_options(opts)
+ self.__force = opts["force"]
+
+ def validate_cpu(self):
+ print "Checking CPU compatibility"
+
+ print " `- Dumping CPU info"
+ req = cr_rpc.criu_req()
+ req.type = cr_rpc.CPUINFO_DUMP
+ req.opts.images_dir_fd = self.img.work_dir_fd()
+ req.keep_open = True
+ resp = self.criu.send_req(req)
+ if not resp.success:
+ raise Exception("Can't dump cpuinfo")
+
+ print " `- Sending CPU info"
+ self.img.send_cpuinfo(self.th, self.data_sk)
+
+ print " `- Checking CPU info"
+ if not self.th.check_cpuinfo():
+ raise Exception("CPUs mismatch")
+
+ def start_migration(self):
+ self._mstat.start()
+
+ if not self.__force:
+ self.validate_cpu()
+
+ print "Preliminary FS migration"
+ self.fs.set_work_dir(self.img.work_dir())
+ self.fs.start_migration()
+
+ print "Starting iterations"
+ cc = self.criu
+
+ while True:
+ print "* Iteration %d" % self.iteration
+
+ self.th.start_iter()
+ self.img.new_image_dir()
+
+ print "\tIssuing pre-dump command to service"
+
+ req = self.make_dump_req(cr_rpc.PRE_DUMP)
+ resp = cc.send_req(req)
+ if not resp.success:
+ raise Exception("Pre-dump failed")
+
+ print "\tPre-dump succeeded"
+
+ self.th.end_iter()
+
+ stats = criu_api.criu_get_dstats(self.img)
+ self._mstat.iteration(stats)
+
+ #
+ # Need to decide whether we do next iteration
+ # or stop on the existing and go do full dump
+ # and restore
+ #
+
+ print "Checking iteration progress:"
+
+ if stats.pages_written <= phaul_iter_min_size:
+ print "\t> Small dump"
+ break;
+
+ if self.prev_stats:
+ w_add = stats.pages_written - self.prev_stats.pages_written
+ w_add = w_add * 100 / self.prev_stats.pages_written
+ if w_add > phaul_iter_grow_max:
+ print "\t> Iteration grows"
+ break
+
+ if self.iteration >= phaul_iter_max:
+ print "\t> Too many iterations"
+ break
+
+ self.iteration += 1
+ self.prev_stats = stats
+ print "\t> Proceed to next iteration"
+
+ self.fs.next_iteration()
+
+ #
+ # Finish with iterations -- do full dump, send images
+ # to target host and restore from them there
+ #
+
+ print "Final dump and restore"
+
+ self.th.start_iter()
+ self.img.new_image_dir()
+
+ print "\tIssuing dump command to service"
+ req = self.make_dump_req(cr_rpc.DUMP)
+ req.opts.notify_scripts = True
+ req.opts.file_locks = True
+ req.opts.evasive_devices = True
+ req.opts.link_remap = True
+ if self.htype.can_migrate_tcp():
+ req.opts.tcp_established = True
+
+ resp = cc.send_req(req)
+ while True:
+ if resp.type != cr_rpc.NOTIFY:
+ raise Exception("Dump failed")
+
+ if resp.notify.script == "post-dump":
+ #
+ # Dump is effectively over. Now CRIU
+ # waits for us to do whatever we want
+ # and keeps the tasks frozen.
+ #
+ break
+
+ elif resp.notify.script == "network-lock":
+ self.htype.net_lock()
+ elif resp.notify.script == "network-unlock":
+ self.htype.net_unlock()
+
+ print "\t\tNotify (%s)" % resp.notify.script
+ resp = cc.ack_notify()
+
+ print "Dump complete"
+ self.th.end_iter()
+
+ #
+ # Dump is complete -- go to target node,
+ # restore them there and kill (if required)
+ # tasks on source node
+ #
+
+ print "Final FS and images sync"
+ self.fs.stop_migration()
+ self.img.sync_imgs_to_target(self.th, self.htype, self.data_sk)
+
+ print "Asking target host to restore"
+ self.th.restore_from_images()
+
+ #
+ # Ack the notify after restore -- CRIU would
+ # then terminate all tasks and send us back
+ # DUMP/success message
+ #
+
+ resp = cc.ack_notify()
+ if not resp.success:
+ raise Exception("Dump screwed up")
+
+ self.htype.umount()
+
+ stats = criu_api.criu_get_dstats(self.img)
+ self._mstat.iteration(stats)
+ self._mstat.stop(self)
+ self.img.close()
+ cc.close()
diff --git a/phaul/p_haul_lxc.py b/phaul/p_haul_lxc.py
new file mode 100644
index 0000000..27df698
--- /dev/null
+++ b/phaul/p_haul_lxc.py
@@ -0,0 +1,166 @@
+#
+# LinuX Containers hauler module
+#
+
+import os
+import shutil
+import p_haul_cgroup
+import util
+import fs_haul_shared
+import fs_haul_subtree
+from subprocess import Popen, PIPE
+
+name = "lxc"
+lxc_dir = "/var/lib/lxc/"
+lxc_rootfs_dir = "/usr/lib64/lxc/rootfs"
+cg_image_name = "lxccg.img"
+
+class p_haul_type:
+ def __init__(self, name):
+ self._ctname = name
+ #
+ # This list would contain (v_in, v_out, v_br) tuples where
+ # v_in is the name of veth device in CT
+ # v_out is its peer on the host
+ # v_bridge is the bridge to which thie veth is attached
+ #
+ self._veths = []
+ self._cfg = {}
+
+ def __load_ct_config(self):
+ print "Loading config file from %s" % self.__ct_config()
+
+ self._cfg = {}
+ self._veths = []
+
+ veth = None
+
+ ifd = open(self.__ct_config())
+ for line in ifd:
+ if not ("=" in line):
+ continue
+ k, v = map(lambda a: a.strip(), line.split("=", 1))
+ self._cfg[k] = v
+
+ if k == "lxc.network.type":
+ if v != "veth":
+ raise Exception("Unsupported network device type: %s", v)
+ if veth:
+ self._veths.append(veth)
+ veth = util.net_dev()
+ elif k == "lxc.network.link":
+ veth.link = v
+ elif k == "lxc.network.name":
+ veth.name = v
+ elif k == "lxc.network.veth.pair":
+ veth.pair = v
+ if veth:
+ self._veths.append(veth)
+ ifd.close()
+
+ def __apply_cg_config(self):
+ print "Applying CT configs"
+ # FIXME -- implement
+ pass
+
+ def init_src(self):
+ self._fs_mounted = True
+ self._bridged = True
+ self.__load_ct_config()
+
+ def init_dst(self):
+ self._fs_mounted = False
+ self._bridged = False
+ self.__load_ct_config()
+
+ def set_options(self, opts):
+ pass
+
+ def root_task_pid(self):
+ pid = -1;
+
+ pd = Popen(["lxc-info", "-n", self._ctname], stdout = PIPE)
+ for l in pd.stdout:
+ if l.startswith("PID:"):
+ pid = int(l.split(":")[1])
+ status = pd.wait()
+ if status:
+ raise Exception("lxc info -n %s failed: %d" %
+ (self._ctname, status))
+ if pid == -1:
+ raise Exception("CT isn't running")
+ return pid
+
+ def __ct_rootfs(self):
+ return self._cfg['lxc.rootfs']
+
+ def __ct_root(self):
+ return os.path.join(lxc_rootfs_dir, self._ctname)
+
+ def __ct_config(self):
+ return os.path.join(lxc_dir, self._ctname, "config")
+
+ #
+ # Meta-images for LXC -- container config and info about CGroups
+ #
+ def get_meta_images(self, dir):
+ cg_img = os.path.join(dir, cg_image_name)
+ p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
+ cfg_name = self.__ct_config()
+ return [ (cfg_name, "config"),
+ (cg_img, cg_image_name) ]
+
+ def put_meta_images(self, dir):
+ print "Putting config file into %s" % lxc_dir
+
+ shutil.copy(os.path.join(dir, "config"), self.__ct_config())
+
+ # Keep this name, we'll need one in prepare_ct()
+ self.cg_img = os.path.join(dir, cg_image_name)
+
+ #
+ # Create cgroup hierarchy and put root task into it
+ # Hierarchy is unlimited, we will apply config limitations
+ # in ->restored->__apply_cg_config later
+ #
+ def prepare_ct(self, pid):
+ p_haul_cgroup.restore_hier(pid, self.cg_img)
+
+ def mount(self):
+ nroot = self.__ct_root()
+ print "Mounting CT root to %s" % nroot
+ if not os.access(nroot, os.F_OK):
+ os.makedirs(nroot)
+ os.system("mount --bind %s %s" % (self.__ct_rootfs(), nroot))
+ self._fs_mounted = True
+ return nroot
+
+ def get_fs(self):
+ return fs_haul_shared.p_haul_fs()
+
+ def restored(self, pid):
+ self.__apply_cg_config()
+
+ def net_lock(self):
+ for veth in self._veths:
+ util.ifdown(veth.pair)
+
+ def net_unlock(self):
+ for veth in self._veths:
+ util.ifup(veth.pair)
+ if veth.link and not self._bridged:
+ util.bridge_add(veth.pair, veth.link)
+
+ def can_migrate_tcp(self):
+ return True
+
+ def umount(self):
+ pass
+
+ def veths(self):
+ #
+ # Caller wants to see list of tuples with [0] being name
+ # in CT and [1] being name on host. Just return existing
+ # tuples, the [2] with bridge name wouldn't hurt
+ #
+ return self._veths
diff --git a/phaul/p_haul_ovz.py b/phaul/p_haul_ovz.py
new file mode 100644
index 0000000..44ddbb5
--- /dev/null
+++ b/phaul/p_haul_ovz.py
@@ -0,0 +1,190 @@
+#
+# OpenVZ containers hauler module
+#
+
+import os
+import shutil
+import p_haul_cgroup
+import util
+import fs_haul_shared
+import fs_haul_subtree
+
+name = "ovz"
+vzpid_dir = "/var/lib/vzctl/vepid/"
+vz_dir = "/vz"
+vzpriv_dir = "%s/private" % vz_dir
+vzroot_dir = "%s/root" % vz_dir
+vz_conf_dir = "/etc/vz/conf/"
+vz_pidfiles = "/var/lib/vzctl/vepid/"
+cg_image_name = "ovzcg.img"
+
+class p_haul_type:
+ def __init__(self, ctid):
+ self._ctid = ctid
+ #
+ # This list would contain (v_in, v_out, v_br) tuples where
+ # v_in is the name of veth device in CT
+ # v_out is its peer on the host
+ # v_bridge is the bridge to which thie veth is attached
+ #
+ self._veths = []
+ self._cfg = []
+
+ def __load_ct_config(self, path):
+ print "Loading config file from %s" % path
+ ifd = open(os.path.join(path, self.__ct_config()))
+ for line in ifd:
+ self._cfg.append(line)
+
+ if line.startswith("NETIF="):
+ #
+ # Parse and keep veth pairs, later we will
+ # equip restore request with this data and
+ # will use it while (un)locking the network
+ #
+ v_in = None
+ v_out = None
+ v_bridge = None
+ vs = line.strip().split("=", 1)[1].strip("\"")
+ for parm in vs.split(","):
+ pa = parm.split("=")
+ if pa[0] == "ifname":
+ v_in = pa[1]
+ elif pa[0] == "host_ifname":
+ v_out = pa[1]
+ elif pa[0] == "bridge":
+ v_bridge = pa[1]
+
+ if v_in and v_out:
+ print "\tCollect %s -> %s (%s) veth" % (v_in, v_out, v_bridge)
+ veth = util.net_dev()
+ veth.name = v_in
+ veth.pair = v_out
+ veth.link = v_bridge
+ self._veths.append(veth)
+
+ ifd.close()
+
+ def __apply_cg_config(self):
+ print "Applying CT configs"
+ # FIXME -- implement
+ pass
+
+ def init_src(self):
+ self._fs_mounted = True
+ self._bridged = True
+ self.__load_ct_config(vz_conf_dir)
+
+ def init_dst(self):
+ self._fs_mounted = False
+ self._bridged = False
+
+ def set_options(self, opts):
+ pass
+
+ def root_task_pid(self):
+ pf = open(os.path.join(vzpid_dir, self._ctid))
+ pid = pf.read()
+ return int(pid)
+
+ def __ct_priv(self):
+ return "%s/%s" % (vzpriv_dir, self._ctid)
+
+ def __ct_root(self):
+ return "%s/%s" % (vzroot_dir, self._ctid)
+
+ def __ct_config(self):
+ return "%s.conf" % self._ctid
+
+ #
+ # Meta-images for OVZ -- container config and info about CGroups
+ #
+ def get_meta_images(self, path):
+ cg_img = os.path.join(path, cg_image_name)
+ p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
+ cfg_name = self.__ct_config()
+ return [ (os.path.join(vz_conf_dir, cfg_name), cfg_name), \
+ (cg_img, cg_image_name) ]
+
+ def put_meta_images(self, path):
+ print "Putting config file into %s" % vz_conf_dir
+
+ self.__load_ct_config(path)
+ ofd = open(os.path.join(vz_conf_dir, self.__ct_config()), "w")
+ ofd.writelines(self._cfg)
+ ofd.close()
+
+ # Keep this name, we'll need one in prepare_ct()
+ self.cg_img = os.path.join(path, cg_image_name)
+
+ #
+ # Create cgroup hierarchy and put root task into it
+ # Hierarchy is unlimited, we will apply config limitations
+ # in ->restored->__apply_cg_config later
+ #
+ def prepare_ct(self, pid):
+ p_haul_cgroup.restore_hier(pid, self.cg_img)
+
+ def __umount_root(self):
+ print "Umounting CT root"
+ os.system("umount %s" % self.__ct_root())
+ self._fs_mounted = False
+
+ def mount(self):
+ nroot = self.__ct_root()
+ print "Mounting CT root to %s" % nroot
+ if not os.access(nroot, os.F_OK):
+ os.makedirs(nroot)
+ os.system("mount --bind %s %s" % (self.__ct_priv(), nroot))
+ self._fs_mounted = True
+ return nroot
+
+ def umount(self):
+ if self._fs_mounted:
+ self.__umount_root()
+
+ def get_fs(self):
+ rootfs = util.path_to_fs(self.__ct_priv())
+ if not rootfs:
+ print "CT is on unknown FS"
+ return None
+
+ print "CT is on %s" % rootfs
+
+ if rootfs == "nfs":
+ return fs_haul_shared.p_haul_fs()
+ if rootfs == "ext3" or rootfs == "ext4":
+ return fs_haul_subtree.p_haul_fs(self.__ct_priv())
+
+ print "Unknown CT FS"
+ return None
+
+ def restored(self, pid):
+ print "Writing pidfile"
+ pidfile = open(os.path.join(vz_pidfiles, self._ctid), 'w')
+ pidfile.write("%d" % pid)
+ pidfile.close()
+
+ self.__apply_cg_config()
+
+ def net_lock(self):
+ for veth in self._veths:
+ util.ifdown(veth.pair)
+
+ def net_unlock(self):
+ for veth in self._veths:
+ util.ifup(veth.pair)
+ if veth.link and not self._bridged:
+ util.bridge_add(veth.pair, veth.link)
+
+ def can_migrate_tcp(self):
+ return True
+
+ def veths(self):
+ #
+ # Caller wants to see list of tuples with [0] being name
+ # in CT and [1] being name on host. Just return existing
+ # tuples, the [2] with bridge name wouldn't hurt
+ #
+ return self._veths
+
diff --git a/phaul/p_haul_pid.py b/phaul/p_haul_pid.py
new file mode 100644
index 0000000..f0051b0
--- /dev/null
+++ b/phaul/p_haul_pid.py
@@ -0,0 +1,84 @@
+#
+# Individual process hauler
+#
+
+import fs_haul_shared
+
+name = "pid"
+
+class p_haul_type:
+ def __init__(self, id):
+ self.pid = int(id)
+ self._pidfile = None
+
+
+ #
+ # Initialize itself for source node or destination one
+ #
+ def init_src(self):
+ pass
+ def init_dst(self):
+ pass
+
+ def set_options(self, opts):
+ self._pidfile = opts["dst_rpid"]
+ self._fs_root = opts["pid_root"]
+
+ # Report the pid of the root task of what we're
+ # goung to migrate
+ def root_task_pid(self):
+ return self.pid
+
+ # Prepare filesystem before restoring. Retrun
+ # the new root, if required. 'None' will mean
+ # that things will get restored in the current
+ # mount namespace and w/o chroot
+ def mount(self):
+ return self._fs_root
+
+ # Remove any specific FS setup
+ def umount(self):
+ pass
+
+ # Get driver for FS migration
+ def get_fs(self):
+ return fs_haul_shared.p_haul_fs()
+
+ # Get list of files which should be copied to
+ # the destination node. The dir argument is where
+ # temporary stuff can be put
+ def get_meta_images(self, dir):
+ return []
+
+ # Take your files from dir and put in whatever
+ # places are appropriate. Paths (relative) are
+ # preserved.
+ def put_meta_images(self, dir):
+ pass
+
+ # Things are started to get restored, only the
+ # first task (with pid @pid) is created.
+ def prepare_ct(self, pid):
+ pass
+
+ # Restoring done, the new top task has pid pid
+ def restored(self, pid):
+ if self._pidfile:
+ print "Writing rst pidfile"
+ open(self._pidfile, "w").writelines(["%d" % pid])
+
+ #
+ # Lock and unlock networking
+ #
+ def net_lock(self):
+ pass
+
+ def net_unlock(self):
+ pass
+
+ def can_migrate_tcp(self):
+ return False
+
+ # Get list of veth pairs if any
+ def veths(self):
+ return []
diff --git a/phaul/p_haul_service.py b/phaul/p_haul_service.py
new file mode 100644
index 0000000..5587bd7
--- /dev/null
+++ b/phaul/p_haul_service.py
@@ -0,0 +1,154 @@
+#
+# P.HAUL code, that helps on the target node (rpyc service)
+#
+
+import xem_rpc
+import rpc_pb2 as cr_rpc
+import images
+import criu_api
+import p_haul_type
+
+class phaul_service:
+ def on_connect(self):
+ print "Connected"
+ self.dump_iter = 0
+ self.restored = False
+ self.criu = None
+ self.data_sk = None
+ self.img = None
+ self.htype = None
+
+ def on_disconnect(self):
+ print "Disconnected"
+ if self.criu:
+ self.criu.close()
+
+ if self.data_sk:
+ self.data_sk.close()
+
+ if self.htype and not self.restored:
+ self.htype.umount()
+
+ if self.img:
+ print "Closing images"
+ if not self.restored:
+ self.img.save_images()
+ self.img.close()
+
+ def on_socket_open(self, sk, uname):
+ self.data_sk = sk
+ print "Data socket (%s) accepted" % uname
+
+ def rpc_setup(self, htype_id):
+ print "Setting up service side", htype_id
+ self.img = images.phaul_images("rst")
+ self.criu = criu_api.criu_conn(self.data_sk)
+ self.htype = p_haul_type.get_dst(htype_id)
+
+ def rpc_set_options(self, opts):
+ self.criu.verbose(opts["verbose"])
+ self.img.set_options(opts)
+ self.htype.set_options(opts)
+
+ def start_page_server(self):
+ print "Starting page server for iter %d" % self.dump_iter
+
+ req = cr_rpc.criu_req()
+ req.type = cr_rpc.PAGE_SERVER
+ req.keep_open = True
+ req.opts.ps.fd = self.criu.mem_sk_fileno()
+
+ req.opts.images_dir_fd = self.img.image_dir_fd()
+ req.opts.work_dir_fd = self.img.work_dir_fd()
+ p_img = self.img.prev_image_dir()
+ if p_img:
+ req.opts.parent_img = p_img
+
+ print "\tSending criu rpc req"
+ resp = self.criu.send_req(req)
+ if not resp.success:
+ raise Exception("Failed to start page server")
+
+ print "\tPage server started at %d" % resp.ps.pid
+
+ def rpc_start_iter(self):
+ self.dump_iter += 1
+ self.img.new_image_dir()
+ self.start_page_server()
+
+ def rpc_end_iter(self):
+ pass
+
+ def rpc_start_accept_images(self, dir_id):
+ self.img.start_accept_images(dir_id, self.data_sk)
+
+ def rpc_stop_accept_images(self):
+ self.img.stop_accept_images()
+
+ def rpc_check_cpuinfo(self):
+ print "Checking cpuinfo"
+ req = cr_rpc.criu_req()
+ req.type = cr_rpc.CPUINFO_CHECK
+ req.opts.images_dir_fd = self.img.work_dir_fd()
+ req.keep_open = True
+ resp = self.criu.send_req(req)
+ print " `-", resp.success
+ return resp.success
+
+ def rpc_restore_from_images(self):
+ print "Restoring from images"
+ self.htype.put_meta_images(self.img.image_dir())
+
+ req = cr_rpc.criu_req()
+ req.type = cr_rpc.RESTORE
+ req.opts.images_dir_fd = self.img.image_dir_fd()
+ req.opts.work_dir_fd = self.img.work_dir_fd()
+ req.opts.notify_scripts = True
+
+ if self.htype.can_migrate_tcp():
+ req.opts.tcp_established = True
+
+ for veth in self.htype.veths():
+ v = req.opts.veths.add()
+ v.if_in = veth.name
+ v.if_out = veth.pair
+
+ nroot = self.htype.mount()
+ if nroot:
+ req.opts.root = nroot
+ print "Restore root set to %s" % req.opts.root
+
+ cc = self.criu
+ resp = cc.send_req(req)
+ while True:
+ if resp.type == cr_rpc.NOTIFY:
+ print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
+ if resp.notify.script == "setup-namespaces":
+ #
+ # At that point we have only one task
+ # living in namespaces and waiting for
+ # us to ACK the notify. Htype might want
+ # to configure namespace (external net
+ # devices) and cgroups
+ #
+ self.htype.prepare_ct(resp.notify.pid)
+ elif resp.notify.script == "network-unlock":
+ self.htype.net_unlock()
+ elif resp.notify.script == "network-lock":
+ raise Exception("Locking network on restore?")
+
+ resp = cc.ack_notify()
+ continue
+
+ if not resp.success:
+ raise Exception("Restore failed")
+
+ print "Restore succeeded"
+ break
+
+ self.htype.restored(resp.restore.pid)
+ self.restored = True
+
+ def rpc_restore_time(self):
+ stats = criu_api.criu_get_rstats(self.img)
+ return stats.restore_time
diff --git a/phaul/p_haul_type.py b/phaul/p_haul_type.py
new file mode 100644
index 0000000..6ac093d
--- /dev/null
+++ b/phaul/p_haul_type.py
@@ -0,0 +1,33 @@
+#
+# Haulers' type selector
+# Types (htypes) are classes, that help hauling processes.
+# See p_haul_pid for comments of how a class should look like.
+#
+
+import p_haul_ovz
+import p_haul_pid
+import p_haul_lxc
+
+haul_types = {
+ p_haul_ovz.name: p_haul_ovz,
+ p_haul_pid.name: p_haul_pid,
+ p_haul_lxc.name: p_haul_lxc,
+}
+
+def __get(id):
+ if haul_types.has_key(id[0]):
+ h_type = haul_types[id[0]]
+ return h_type.p_haul_type(id[1])
+ else:
+ print "Unknown type. Try one of", haul_types.keys()
+ return None
+
+def get_src(id):
+ ht = __get(id)
+ ht.init_src()
+ return ht
+
+def get_dst(id):
+ ht = __get(id)
+ ht.init_dst()
+ return ht
diff --git a/phaul/util.py b/phaul/util.py
new file mode 100644
index 0000000..9883bef
--- /dev/null
+++ b/phaul/util.py
@@ -0,0 +1,46 @@
+import os
+import fcntl
+import errno
+
+class net_dev:
+ def init(self):
+ self.name = None
+ self.pair = None
+ self.link = None
+
+def path_to_fs(path):
+ dev = os.stat(path)
+ dev_str = "%d:%d" % (os.major(dev.st_dev), os.minor(dev.st_dev))
+ mfd = open("/proc/self/mountinfo")
+ for ln in mfd:
+ ln_p = ln.split(None, 9)
+ if dev_str == ln_p[2]:
+ return ln_p[8]
+
+ return None
+
+def ifup(ifname):
+ print "\t\tUpping %s" % ifname
+ os.system("ip link set %s up" % ifname)
+
+def ifdown(ifname):
+ print "\t\tDowning %s" % ifname
+ os.system("ip link set %s down" % ifname)
+
+def bridge_add(ifname, brname):
+ print "\t\tAdd %s to %s" % (ifname, brname)
+ os.system("brctl addif %s %s" % (brname, ifname))
+
+def set_cloexec(sk):
+ fd = sk.fileno()
+ flags = fcntl.fcntl(sk, fcntl.F_GETFD)
+ fcntl.fcntl(sk, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+def makedirs(dirpath):
+ try:
+ os.makedirs(dirpath)
+ except OSError as er:
+ if er.errno == errno.EEXIST and os.path.isdir(dirpath):
+ pass
+ else:
+ raise
diff --git a/phaul/xem_rpc.py b/phaul/xem_rpc.py
new file mode 100644
index 0000000..8bbdac7
--- /dev/null
+++ b/phaul/xem_rpc.py
@@ -0,0 +1,198 @@
+import socket
+import select
+import threading
+import traceback
+import util
+import struct
+
+rpc_port = 12345
+rpc_sk_buf = 256
+
+RPC_CMD = 1
+RPC_CALL = 2
+
+RPC_RESP = 1
+RPC_EXC = 2
+
+#
+# Client
+#
+
+class _rpc_proxy_caller:
+ def __init__(self, sk, typ, fname):
+ self._rpc_sk = sk
+ self._fn_typ = typ
+ self._fn_name = fname
+
+ def __call__(self, *args):
+ call = (self._fn_typ, self._fn_name, args)
+ raw_data = repr(call)
+ self._rpc_sk.send(raw_data)
+ raw_data = self._rpc_sk.recv(rpc_sk_buf)
+ resp = eval(raw_data)
+
+ if resp[0] == RPC_RESP:
+ return resp[1]
+ elif resp[0] == RPC_EXC:
+ print "Remote exception"
+ raise Exception(resp[1])
+ else:
+ raise Exception("Proto resp error")
+
+class rpc_proxy:
+ def __init__(self, conn, *args):
+ self._srv = conn
+ self._rpc_sk = self._make_sk()
+ util.set_cloexec(self._rpc_sk)
+ _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
+
+ def __getattr__(self, attr):
+ return _rpc_proxy_caller(self._rpc_sk, RPC_CALL, attr)
+
+ def _make_sk(self):
+ sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sk.connect((self._srv, rpc_port))
+ return sk
+
+ def open_socket(self, uname):
+ sk = self._make_sk()
+ host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
+ c = _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "pick_channel")
+ c(host, uname)
+ return sk
+
+
+
+#
+# Server
+#
+
+class _rpc_server_sk:
+ def __init__(self, sk):
+ self._sk = sk
+ self._master = None
+
+ def fileno(self):
+ return self._sk.fileno()
+
+ def hash_name(self):
+ return self._sk.getpeername()
+
+ def get_name(self, mgr):
+ return self.hash_name()
+
+ def work(self, mgr):
+ raw_data = self._sk.recv(rpc_sk_buf)
+ if not raw_data:
+ mgr.remove(self)
+ if self._master:
+ self._master.on_disconnect()
+ self._sk.close()
+ return
+
+ data = eval(raw_data)
+ try:
+ if data[0] == RPC_CALL:
+ if not self._master:
+ raise Exception("Proto seq error")
+
+ res = getattr(self._master, "rpc_" + data[1])(*data[2])
+ elif data[0] == RPC_CMD:
+ res = getattr(self, data[1])(mgr, *data[2])
+ else:
+ raise Exception(("Proto typ error", data[0]))
+ except Exception as e:
+ traceback.print_exc()
+ res = (RPC_EXC, e)
+ else:
+ res = (RPC_RESP, res)
+
+ raw_data = repr(res)
+ self._sk.send(raw_data)
+
+ def init_rpc(self, mgr, args):
+ util.set_cloexec(self)
+ self._master = mgr.make_master()
+ self._master.on_connect(*args)
+
+ def pick_channel(self, mgr, hash_name, uname):
+ sk = mgr.pick_sk(hash_name)
+ if sk:
+ self._master.on_socket_open(sk._sk, uname)
+
+class _rpc_server_ask:
+ def __init__(self):
+ sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sk.bind(("0.0.0.0", rpc_port))
+ sk.listen(8)
+ self._sk = sk
+ util.set_cloexec(self)
+
+ def fileno(self):
+ return self._sk.fileno()
+
+ def work(self, mgr):
+ sk, addr = self._sk.accept()
+ mgr.add(_rpc_server_sk(sk))
+
+class _rpc_stop_fd:
+ def __init__(self, fd):
+ self._fd = fd
+
+ def fileno(self):
+ return self._fd.fileno()
+
+ def work(self, mgr):
+ mgr.stop()
+
+class _rpc_server_manager:
+ def __init__(self, srv_class):
+ self._srv_class = srv_class
+ self._sk_by_name = {}
+ self._poll_list = [_rpc_server_ask()]
+ self._alive = True
+
+ def add(self, sk):
+ self._sk_by_name[sk.hash_name()] = sk
+ self._poll_list.append(sk)
+
+ def remove(self, sk):
+ self._sk_by_name.pop(sk.hash_name())
+ self._poll_list.remove(sk)
+
+ def pick_sk(self, hash_name):
+ sk = self._sk_by_name.pop(hash_name, None)
+ if sk:
+ self._poll_list.remove(sk)
+ return sk
+
+ def make_master(self):
+ return self._srv_class()
+
+ def stop(self):
+ self._alive = False
+
+ def loop(self, stop_fd):
+ if stop_fd:
+ self._poll_list.append(_rpc_stop_fd(stop_fd))
+
+ while self._alive:
+ r, w, x = select.select(self._poll_list, [], [])
+ for sk in r:
+ sk.work(self)
+
+ print "RPC Service stops"
+
+class rpc_threaded_srv(threading.Thread):
+ def __init__(self, srv_class):
+ threading.Thread.__init__(self)
+ self._mgr = _rpc_server_manager(srv_class)
+ self._stop_fd = None
+
+ def run(self):
+ self._mgr.loop(self._stop_fd)
+
+ def get_stop_fd(self):
+ sks = socket.socketpair()
+ self._stop_fd = sks[0]
+ return sks[1]
diff --git a/util.py b/util.py
deleted file mode 100644
index 9883bef..0000000
--- a/util.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import os
-import fcntl
-import errno
-
-class net_dev:
- def init(self):
- self.name = None
- self.pair = None
- self.link = None
-
-def path_to_fs(path):
- dev = os.stat(path)
- dev_str = "%d:%d" % (os.major(dev.st_dev), os.minor(dev.st_dev))
- mfd = open("/proc/self/mountinfo")
- for ln in mfd:
- ln_p = ln.split(None, 9)
- if dev_str == ln_p[2]:
- return ln_p[8]
-
- return None
-
-def ifup(ifname):
- print "\t\tUpping %s" % ifname
- os.system("ip link set %s up" % ifname)
-
-def ifdown(ifname):
- print "\t\tDowning %s" % ifname
- os.system("ip link set %s down" % ifname)
-
-def bridge_add(ifname, brname):
- print "\t\tAdd %s to %s" % (ifname, brname)
- os.system("brctl addif %s %s" % (brname, ifname))
-
-def set_cloexec(sk):
- fd = sk.fileno()
- flags = fcntl.fcntl(sk, fcntl.F_GETFD)
- fcntl.fcntl(sk, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
-
-def makedirs(dirpath):
- try:
- os.makedirs(dirpath)
- except OSError as er:
- if er.errno == errno.EEXIST and os.path.isdir(dirpath):
- pass
- else:
- raise
diff --git a/xem_rpc.py b/xem_rpc.py
deleted file mode 100644
index 8bbdac7..0000000
--- a/xem_rpc.py
+++ /dev/null
@@ -1,198 +0,0 @@
-import socket
-import select
-import threading
-import traceback
-import util
-import struct
-
-rpc_port = 12345
-rpc_sk_buf = 256
-
-RPC_CMD = 1
-RPC_CALL = 2
-
-RPC_RESP = 1
-RPC_EXC = 2
-
-#
-# Client
-#
-
-class _rpc_proxy_caller:
- def __init__(self, sk, typ, fname):
- self._rpc_sk = sk
- self._fn_typ = typ
- self._fn_name = fname
-
- def __call__(self, *args):
- call = (self._fn_typ, self._fn_name, args)
- raw_data = repr(call)
- self._rpc_sk.send(raw_data)
- raw_data = self._rpc_sk.recv(rpc_sk_buf)
- resp = eval(raw_data)
-
- if resp[0] == RPC_RESP:
- return resp[1]
- elif resp[0] == RPC_EXC:
- print "Remote exception"
- raise Exception(resp[1])
- else:
- raise Exception("Proto resp error")
-
-class rpc_proxy:
- def __init__(self, conn, *args):
- self._srv = conn
- self._rpc_sk = self._make_sk()
- util.set_cloexec(self._rpc_sk)
- _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
-
- def __getattr__(self, attr):
- return _rpc_proxy_caller(self._rpc_sk, RPC_CALL, attr)
-
- def _make_sk(self):
- sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.connect((self._srv, rpc_port))
- return sk
-
- def open_socket(self, uname):
- sk = self._make_sk()
- host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
- c = _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "pick_channel")
- c(host, uname)
- return sk
-
-
-
-#
-# Server
-#
-
-class _rpc_server_sk:
- def __init__(self, sk):
- self._sk = sk
- self._master = None
-
- def fileno(self):
- return self._sk.fileno()
-
- def hash_name(self):
- return self._sk.getpeername()
-
- def get_name(self, mgr):
- return self.hash_name()
-
- def work(self, mgr):
- raw_data = self._sk.recv(rpc_sk_buf)
- if not raw_data:
- mgr.remove(self)
- if self._master:
- self._master.on_disconnect()
- self._sk.close()
- return
-
- data = eval(raw_data)
- try:
- if data[0] == RPC_CALL:
- if not self._master:
- raise Exception("Proto seq error")
-
- res = getattr(self._master, "rpc_" + data[1])(*data[2])
- elif data[0] == RPC_CMD:
- res = getattr(self, data[1])(mgr, *data[2])
- else:
- raise Exception(("Proto typ error", data[0]))
- except Exception as e:
- traceback.print_exc()
- res = (RPC_EXC, e)
- else:
- res = (RPC_RESP, res)
-
- raw_data = repr(res)
- self._sk.send(raw_data)
-
- def init_rpc(self, mgr, args):
- util.set_cloexec(self)
- self._master = mgr.make_master()
- self._master.on_connect(*args)
-
- def pick_channel(self, mgr, hash_name, uname):
- sk = mgr.pick_sk(hash_name)
- if sk:
- self._master.on_socket_open(sk._sk, uname)
-
-class _rpc_server_ask:
- def __init__(self):
- sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.bind(("0.0.0.0", rpc_port))
- sk.listen(8)
- self._sk = sk
- util.set_cloexec(self)
-
- def fileno(self):
- return self._sk.fileno()
-
- def work(self, mgr):
- sk, addr = self._sk.accept()
- mgr.add(_rpc_server_sk(sk))
-
-class _rpc_stop_fd:
- def __init__(self, fd):
- self._fd = fd
-
- def fileno(self):
- return self._fd.fileno()
-
- def work(self, mgr):
- mgr.stop()
-
-class _rpc_server_manager:
- def __init__(self, srv_class):
- self._srv_class = srv_class
- self._sk_by_name = {}
- self._poll_list = [_rpc_server_ask()]
- self._alive = True
-
- def add(self, sk):
- self._sk_by_name[sk.hash_name()] = sk
- self._poll_list.append(sk)
-
- def remove(self, sk):
- self._sk_by_name.pop(sk.hash_name())
- self._poll_list.remove(sk)
-
- def pick_sk(self, hash_name):
- sk = self._sk_by_name.pop(hash_name, None)
- if sk:
- self._poll_list.remove(sk)
- return sk
-
- def make_master(self):
- return self._srv_class()
-
- def stop(self):
- self._alive = False
-
- def loop(self, stop_fd):
- if stop_fd:
- self._poll_list.append(_rpc_stop_fd(stop_fd))
-
- while self._alive:
- r, w, x = select.select(self._poll_list, [], [])
- for sk in r:
- sk.work(self)
-
- print "RPC Service stops"
-
-class rpc_threaded_srv(threading.Thread):
- def __init__(self, srv_class):
- threading.Thread.__init__(self)
- self._mgr = _rpc_server_manager(srv_class)
- self._stop_fd = None
-
- def run(self):
- self._mgr.loop(self._stop_fd)
-
- def get_stop_fd(self):
- sks = socket.socketpair()
- self._stop_fd = sks[0]
- return sks[1]
--
1.9.3
More information about the CRIU
mailing list