[CRIU] [PATCH 5/5] p.haul: reform project to simplify installation

nspiridonov nspiridonov at odin.com
Tue Apr 28 08:35:15 PDT 2015


From: jne100 <jne100 at gmail.com>

To simplify installation and packaging move all python logic to
phaul subdirectory. Add setup.py and phaul/__init__.py files needed
for packaging. These changes pack greater part of logic into
'phaul' python package that can be installed using setup tools and
used with the help of import directive.

Signed-off-by: Nikita Spiridonov <nspiridonov at odin.com>
---
 .gitignore               |    1 +
 criu_api.py              |  109 ----------------------
 fs_haul_shared.py        |   28 ------
 fs_haul_subtree.py       |   50 ----------
 images.py                |  176 -----------------------------------
 mstats.py                |   33 -------
 p.haul                   |   14 ++--
 p.haul-service           |    8 +-
 p_haul_cgroup.py         |   79 ----------------
 p_haul_iters.py          |  232 ----------------------------------------------
 p_haul_lxc.py            |  166 ---------------------------------
 p_haul_ovz.py            |   18 ----
 p_haul_pid.py            |   84 -----------------
 p_haul_service.py        |  154 ------------------------------
 p_haul_type.py           |   35 -------
 p_haul_vz.py             |  189 -------------------------------------
 phaul/criu_api.py        |  109 ++++++++++++++++++++++
 phaul/fs_haul_shared.py  |   28 ++++++
 phaul/fs_haul_subtree.py |   50 ++++++++++
 phaul/images.py          |  176 +++++++++++++++++++++++++++++++++++
 phaul/mstats.py          |   33 +++++++
 phaul/p_haul_cgroup.py   |   79 ++++++++++++++++
 phaul/p_haul_iters.py    |  232 ++++++++++++++++++++++++++++++++++++++++++++++
 phaul/p_haul_lxc.py      |  166 +++++++++++++++++++++++++++++++++
 phaul/p_haul_ovz.py      |   18 ++++
 phaul/p_haul_pid.py      |   84 +++++++++++++++++
 phaul/p_haul_service.py  |  154 ++++++++++++++++++++++++++++++
 phaul/p_haul_type.py     |   35 +++++++
 phaul/p_haul_vz.py       |  189 +++++++++++++++++++++++++++++++++++++
 phaul/util.py            |   46 +++++++++
 phaul/xem_rpc.py         |  197 +++++++++++++++++++++++++++++++++++++++
 setup.py                 |    9 ++
 util.py                  |   46 ---------
 xem_rpc.py               |  197 ---------------------------------------
 34 files changed, 1617 insertions(+), 1607 deletions(-)
 delete mode 100644 criu_api.py
 delete mode 100644 fs_haul_shared.py
 delete mode 100644 fs_haul_subtree.py
 delete mode 100644 images.py
 delete mode 100644 mstats.py
 delete mode 100644 p_haul_cgroup.py
 delete mode 100644 p_haul_iters.py
 delete mode 100644 p_haul_lxc.py
 delete mode 100644 p_haul_ovz.py
 delete mode 100644 p_haul_pid.py
 delete mode 100644 p_haul_service.py
 delete mode 100644 p_haul_type.py
 delete mode 100644 p_haul_vz.py
 create mode 100644 phaul/__init__.py
 create mode 100644 phaul/criu_api.py
 create mode 100644 phaul/fs_haul_shared.py
 create mode 100644 phaul/fs_haul_subtree.py
 create mode 100644 phaul/images.py
 create mode 100644 phaul/mstats.py
 create mode 100644 phaul/p_haul_cgroup.py
 create mode 100644 phaul/p_haul_iters.py
 create mode 100644 phaul/p_haul_lxc.py
 create mode 100644 phaul/p_haul_ovz.py
 create mode 100644 phaul/p_haul_pid.py
 create mode 100644 phaul/p_haul_service.py
 create mode 100644 phaul/p_haul_type.py
 create mode 100644 phaul/p_haul_vz.py
 create mode 100644 phaul/util.py
 create mode 100644 phaul/xem_rpc.py
 create mode 100644 setup.py
 delete mode 100644 util.py
 delete mode 100644 xem_rpc.py

diff --git a/.gitignore b/.gitignore
index 941b808..739cb45 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
+build/
 *.pyc
 rpc_pb2.py
 stats_pb2.py
diff --git a/criu_api.py b/criu_api.py
deleted file mode 100644
index a2f70b7..0000000
--- a/criu_api.py
+++ /dev/null
@@ -1,109 +0,0 @@
-#
-# CRIU API
-# Includes class to work with CRIU service and helpers
-#
-
-import socket
-import struct
-import os
-import util
-import subprocess
-import pycriu.rpc as cr_rpc
-import pycriu.images.stats_pb2 as crs
-
-criu_binary = "criu"
-
-req_types = {
-	cr_rpc.DUMP: "dump",
-	cr_rpc.PRE_DUMP: "pre_dump",
-	cr_rpc.PAGE_SERVER: "page_server",
-	cr_rpc.RESTORE: "restore",
-	cr_rpc.CPUINFO_DUMP: "cpuinfo-dump",
-	cr_rpc.CPUINFO_CHECK: "cpuinfo-check",
-}
-
-cpuinfo_img_name = "cpuinfo.img"
-
-def_verb = 2
-
-#
-# Connection to CRIU service
-#
-
-class criu_conn:
-	def __init__(self, mem_sk):
-		self._iter = 0
-		self.verb = def_verb
-		css = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
-		util.set_cloexec(css[1])
-		print "`- Passing (ctl:%d, data:%d) pair to CRIU" % (css[0].fileno(), mem_sk.fileno())
-		self._swrk = subprocess.Popen([criu_binary, "swrk", "%d" % css[0].fileno()])
-		css[0].close()
-		self._cs = css[1]
-		self._last_req = -1
-		self._mem_fd = mem_sk.fileno()
-
-	def close(self):
-		self._cs.close()
-		self._swrk.wait()
-
-	def mem_sk_fileno(self):
-		return self._mem_fd
-
-	def verbose(self, level):
-		self.verb = level
-
-	def _recv_resp(self):
-		resp = cr_rpc.criu_resp()
-		resp.ParseFromString(self._cs.recv(1024))
-		if not resp.type in (cr_rpc.NOTIFY, self._last_req):
-			raise Exception("CRIU RPC error (%d/%d)" % (resp.type, self._last_req))
-
-		return resp
-
-	def send_req(self, req):
-		req.opts.log_level = self.verb
-		req.opts.log_file = "criu_%s.%d.log" % (req_types[req.type], self._iter)
-		self._cs.send(req.SerializeToString())
-		self._iter += 1
-		self._last_req = req.type
-
-		return self._recv_resp()
-
-	def ack_notify(self, success = True):
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.NOTIFY
-		req.notify_success = True
-		self._cs.send(req.SerializeToString())
-
-		return self._recv_resp()
-
-#
-# Helper to read CRIU-generated statistics
-#
-
-CRIU_STATS_MAGIC = 0x57093306
-
-def criu_get_stats(img, file_name):
-	s = struct.Struct("I I")
-	f = open(os.path.join(img.work_dir(), file_name))
-	#
-	# Stats file is 4 butes of magic, then 4 bytes with
-	# stats packet size
-	#
-	v = s.unpack(f.read(s.size))
-	if v[0] != CRIU_STATS_MAGIC:
-		raise Exception("Magic is %x, expect %x" % (v[0], CRIU_STATS_MAGIC))
-
-	stats = crs.stats_entry()
-	stats.ParseFromString(f.read(v[1]))
-
-	return stats
-
-def criu_get_dstats(img):
-	stats = criu_get_stats(img, "stats-dump")
-	return stats.dump
-
-def criu_get_rstats(img):
-	stats = criu_get_stats(img, "stats-restore")
-	return stats.restore
diff --git a/fs_haul_shared.py b/fs_haul_shared.py
deleted file mode 100644
index 54e231a..0000000
--- a/fs_haul_shared.py
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Shared FS hauler (noop)
-#
-
-class p_haul_fs:
-	def __init__(self):
-		print "Initilized shared FS hauler"
-		pass
-
-	def set_target_host(self, thost):
-		pass
-
-	def set_work_dir(self, wdir):
-		pass
-
-	def start_migration(self):
-		pass
-
-	def next_iteration(self):
-		pass
-
-	def stop_migration(self):
-		pass
-
-	# Inode numbers do not change on this FS
-	# during migration
-	def persistent_inodes(self):
-		return True
diff --git a/fs_haul_subtree.py b/fs_haul_subtree.py
deleted file mode 100644
index 3181c79..0000000
--- a/fs_haul_subtree.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# FS haul driver, that copies the subtree from
-# one node to another using rsync. It's used in
-# legacy OpenVZ configurations.
-#
-
-import subprocess as sp
-import os
-
-rsync_log_file = "rsync.log"
-
-class p_haul_fs:
-	def __init__(self, subtree_path):
-		print "Initialized subtree FS hauler (%s)" % subtree_path
-		self.__root = subtree_path
-		pass
-
-	def set_target_host(self, thost):
-		self.__thost = thost
-
-	def set_work_dir(self, wdir):
-		self.__wdir = wdir
-
-	def __run_rsync(self):
-		logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")
-		dst = "%s:%s" % (self.__thost, os.path.dirname(self.__root))
-
-		# First rsync might be very long. Wait for it not
-		# to produce big pause between the 1st pre-dump and
-		# .stop_migration
-
-		ret = sp.call(["rsync", "-a", self.__root, dst],
-				stdout = logf, stderr = logf)
-		if ret != 0:
-			raise Exception("Rsync failed")
-
-	def start_migration(self):
-		print "Starting FS migration"
-		self.__run_rsync()
-
-	def next_iteration(self):
-		pass
-
-	def stop_migration(self):
-		print "Doing final FS sync"
-		self.__run_rsync()
-
-	# When rsync-ing FS inodes number will change
-	def persistent_inodes(self):
-		return False
diff --git a/images.py b/images.py
deleted file mode 100644
index a4d465c..0000000
--- a/images.py
+++ /dev/null
@@ -1,176 +0,0 @@
-#
-# images driver for migration (without FS transfer)
-#
-
-import os
-import tempfile
-import tarfile
-import time
-import shutil
-import time
-import threading
-import util
-import criu_api
-
-def_path = "/var/local/p.haul-fs/"
-
-class opendir:
-	def __init__(self, path):
-		self._dirname = path
-		self._dirfd = os.open(path, os.O_DIRECTORY)
-		util.set_cloexec(self)
-
-	def close(self):
-		os.close(self._dirfd)
-		os._dirname = None
-		os._dirfd = -1
-
-	def name(self):
-		return self._dirname
-
-	def fileno(self):
-		return self._dirfd
-
-class untar_thread(threading.Thread):
-	def __init__(self, sk, tdir):
-		threading.Thread.__init__(self)
-		self.__sk = sk
-		self.__dir = tdir
-
-	def run(self):
-		tf = tarfile.open(mode = "r|", fileobj = self.__sk.makefile())
-		tf.extractall(self.__dir)
-		tf.close()
-
-class img_tar:
-	def __init__(self, sk, dirname):
-		self.__tf = tarfile.open(mode = "w|", fileobj = sk.makefile())
-		self.__dir = dirname
-
-	def add(self, img, path = None):
-		if not path:
-			path = os.path.join(self.__dir, img)
-
-		self.__tf.add(path, img)
-
-	def close(self):
-		self.__tf.close()
-
-class phaul_images:
-	WDIR = 1
-	IMGDIR = 2
-
-	def __init__(self, typ):
-		self.current_iter = 0
-		self.sync_time = 0.0
-		self._typ = typ
-		self._keep_on_close = False
-		self._wdir = None
-		self._current_dir = None
-
-	def save_images(self):
-		print "Keeping images"
-		self._keep_on_close = True
-
-	def set_options(self, opts):
-		self._keep_on_close = opts["keep_images"]
-
-		suf = time.strftime("-%y.%m.%d-%H.%M", time.localtime())
-		util.makedirs(opts["img_path"])
-		wdir = tempfile.mkdtemp(suf, "%s-" % self._typ, opts["img_path"])
-		self._wdir = opendir(wdir)
-		self._img_path = os.path.join(self._wdir.name(), "img")
-		os.mkdir(self._img_path)
-
-	def close(self):
-		if not self._wdir:
-			return
-
-		self._wdir.close()
-		if self._current_dir:
-			self._current_dir.close()
-
-		if not self._keep_on_close:
-			print "Removing images"
-			shutil.rmtree(self._wdir.name())
-		else:
-			print "Images are kept in %s" % self._wdir.name()
-		pass
-
-	def img_sync_time(self):
-		return self.sync_time
-
-	def new_image_dir(self):
-		if self._current_dir:
-			self._current_dir.close()
-		self.current_iter += 1
-		img_dir = "%s/%d" % (self._img_path, self.current_iter)
-		print "\tMaking directory %s" % img_dir
-		os.mkdir(img_dir)
-		self._current_dir = opendir(img_dir)
-
-	def image_dir_fd(self):
-		return self._current_dir.fileno()
-
-	def work_dir_fd(self):
-		return self._wdir.fileno()
-
-	def image_dir(self):
-		return self._current_dir.name()
-
-	def work_dir(self):
-		return self._wdir.name()
-
-	def prev_image_dir(self):
-		if self.current_iter == 1:
-			return None
-		else:
-			return "../%d" % (self.current_iter - 1)
-
-	# Images transfer
-	# Are there better ways for doing this?
-
-	def sync_imgs_to_target(self, th, htype, sock):
-		# Pre-dump doesn't generate any images (yet?)
-		# so copy only those from the top dir
-		print "Sending images to target"
-
-		start = time.time()
-		cdir = self.image_dir()
-
-		th.start_accept_images(phaul_images.IMGDIR)
-		tf = img_tar(sock, cdir)
-
-		print "\tPack"
-		for img in filter(lambda x: x.endswith(".img"), os.listdir(cdir)):
-			tf.add(img)
-
-		print "\tAdd htype images"
-		for himg in htype.get_meta_images(cdir):
-			tf.add(himg[1], himg[0])
-
-		tf.close()
-		th.stop_accept_images()
-
-		self.sync_time = time.time() - start
-
-	def send_cpuinfo(self, th, sock):
-		th.start_accept_images(phaul_images.WDIR)
-		tf = img_tar(sock, self.work_dir())
-		tf.add(criu_api.cpuinfo_img_name)
-		tf.close()
-		th.stop_accept_images()
-
-	def start_accept_images(self, dir_id, sk):
-		if dir_id == phaul_images.WDIR:
-			dirname = self.work_dir()
-		else:
-			dirname = self.image_dir()
-
-		self.__acc_tar = untar_thread(sk, dirname)
-		self.__acc_tar.start()
-		print "Started images server"
-
-	def stop_accept_images(self):
-		print "Waiting for images to unpack"
-		self.__acc_tar.join()
diff --git a/mstats.py b/mstats.py
deleted file mode 100644
index f1a9c32..0000000
--- a/mstats.py
+++ /dev/null
@@ -1,33 +0,0 @@
-import time
-
-def usec2sec(usec):
-	return usec / 1000000.
-
-class migration_stats:
-	def __init__(self):
-		self._iter_fr_times = []
-		self._frozen_time = 0
-
-	def start(self):
-		self._start_time = time.time()
-
-	def stop(self, iters):
-		self._rst_time = iters.th.restore_time()
-		self._img_sync_time = iters.img.img_sync_time()
-		self._end_time = time.time()
-
-		self._print_stats()
-
-	def iteration(self, stats):
-		print "Dumped %d pages, %d skipped" % \
-				(stats.pages_written, stats.pages_skipped_parent)
-
-		self._iter_fr_times.append("%.2lf" % usec2sec(stats.frozen_time))
-		self._frozen_time += stats.frozen_time
-
-	def _print_stats(self):
-		print "Migration succeeded"
-		print "\t   total time is ~%.2lf sec" % (self._end_time - self._start_time)
-		print "\t  frozen time is ~%.2lf sec (" % usec2sec(self._frozen_time), self._iter_fr_times, ")"
-		print "\t restore time is ~%.2lf sec" % usec2sec(self._rst_time)
-		print "\timg sync time is ~%.2lf sec" % (self._img_sync_time)
diff --git a/p.haul b/p.haul
index 859ba18..415c591 100755
--- a/p.haul
+++ b/p.haul
@@ -1,10 +1,10 @@
 #!/usr/bin/env python
 import sys
 import argparse
-import p_haul_iters as ph_iters
-import images
-import criu_api
-import xem_rpc
+import phaul.p_haul_iters as ph_iters
+import phaul.images as ph_images
+import phaul.criu_api as ph_criu_api
+import phaul.xem_rpc as ph_xem_rpc
 
 # Usage idea
 # p.haul <type> <id> <destination>
@@ -22,13 +22,13 @@ parser = argparse.ArgumentParser("Process HAULer")
 parser.add_argument("type", help = "Type of hat to haul, e.g. ovz")
 parser.add_argument("id", help = "ID of what to haul")
 parser.add_argument("to", help = "IP where to haul")
-parser.add_argument("-v", help = "Verbosity level", default = criu_api.def_verb, type = int, dest = "verbose")
+parser.add_argument("-v", help = "Verbosity level", default = ph_criu_api.def_verb, type = int, dest = "verbose")
 parser.add_argument("--keep-images", help = "Keep images after migration", default = False, action = 'store_true')
 parser.add_argument("--dst-rpid", help = "Write pidfile on restore", default = None)
-parser.add_argument("--img-path", help = "Dirctory where to put images", default = images.def_path)
+parser.add_argument("--img-path", help = "Dirctory where to put images", default = ph_images.def_path)
 parser.add_argument("--pid-root", help = "Path to tree's FS root")
 parser.add_argument("--force", help = "Don't do any sanity (CPU compat) checks", default = False, action = 'store_true')
-parser.add_argument("--port", help = "Port where to haul", type = int, default = xem_rpc.rpc_port)
+parser.add_argument("--port", help = "Port where to haul", type = int, default = ph_xem_rpc.rpc_port)
 
 args = vars(parser.parse_args())
 
diff --git a/p.haul-service b/p.haul-service
index 0fd52c2..210b7da 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -1,14 +1,14 @@
 #!/usr/bin/env python
 
 import signal
-import xem_rpc
-import p_haul_service as ph_srv
 import argparse
+import phaul.xem_rpc as ph_xem_rpc
+import phaul.p_haul_service as ph_srv
 
 if __name__ == "__main__":
 	parser = argparse.ArgumentParser("Process HAULer service server")
 	parser.add_argument("--bind-addr", help = "IP to bind to", type = str, default = "0.0.0.0")
-	parser.add_argument("--bind-port", help = "Port to bind to", type = int, default = xem_rpc.rpc_port)
+	parser.add_argument("--bind-port", help = "Port to bind to", type = int, default = ph_xem_rpc.rpc_port)
 
 	args = vars(parser.parse_args())
 
@@ -20,7 +20,7 @@ if __name__ == "__main__":
 		sfd.close()
 
 	print "Starting p.haul rpyc service"
-	t = xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, host)
+	t = ph_xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, host)
 
 	# FIXME: Setup stop handlers
 	sfd = t.get_stop_fd()
diff --git a/p_haul_cgroup.py b/p_haul_cgroup.py
deleted file mode 100644
index 846ee62..0000000
--- a/p_haul_cgroup.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# CGroups manipulations for p.haul.
-#
-# FIXME Isn't it nicer to do it via libcgroup?
-#
-
-import os
-
-cg_root_dir = "/sys/fs/cgroup"
-cg_tasks_file = "tasks"
-
-def cg_line_parse(ln):
-	items = ln.split(":")
-	#
-	# If two controllers are merged tigether, we see
-	# their names comma-separated in proc. The problem
-	# is that the respective directory name in sysfsis
-	# (!) IS NOT THE SAME, controller names can go
-	# reversed.
-	#
-	# That said, we just use the first name component,
-	# in sysfs there would the respective symlink
-	#
-	cname = items[1].split(",")[0]
-	cdir = items[2]
-
-	return cname, cdir
-
-def dump_hier(pid, img):
-	print "\tSave CG for %d into %s" % (pid, img)
-	fd = open(img, "w")
-	cg = open("/proc/%d/cgroup" % pid)
-	for ln in cg:
-		cg_controller, cg_dir = cg_line_parse(ln)
-		if not cg_controller.startswith("name="):
-			fd.write("%s%s" % (cg_controller, cg_dir))
-
-	cg.close()
-	fd.close()
-
-#
-# The cpuset controller is unusable before at least one
-# cpu and memory node is set there. For restore it's OK
-# to copy parent masks into it, at the end we'll apply
-# "real" CT config
-#
-
-def cpuset_copy_parent(path, c):
-	c = "cpuset.%s" % c
-	ppath = os.path.dirname(path)
-	pfd = open(os.path.join(ppath, c))
-	cfd = open(os.path.join(path, c), "w")
-	cfd.write(pfd.read())
-	cfd.close()
-	pfd.close()
-
-def cpuset_allow_all(path):
-	cpuset_copy_parent(path, "cpus")
-	cpuset_copy_parent(path, "mems")
-
-def restore_one_controller(pid, ln):
-	cg_path = os.path.join(cg_root_dir, ln.strip())
-	print "[%s]" % cg_path
-	if not os.access(cg_path, os.F_OK):
-		os.makedirs(cg_path)
-	if ln.startswith("cpuset"):
-		cpuset_allow_all(cg_path)
-
-	tf = open(os.path.join(cg_path, cg_tasks_file), "w")
-	tf.write("%d" % pid)
-	tf.close()
-
-def restore_hier(pid, img):
-	print "\tCreate hier for %d from %s" % (pid, img)
-	fd = open(img)
-	for ln in fd:
-		restore_one_controller(pid, ln)
-	fd.close()
-	pass
diff --git a/p_haul_iters.py b/p_haul_iters.py
deleted file mode 100644
index 7a898da..0000000
--- a/p_haul_iters.py
+++ /dev/null
@@ -1,232 +0,0 @@
-#
-# The P.HAUL core -- the class that drives migration
-#
-
-import images
-import mstats
-import xem_rpc
-import pycriu.rpc as cr_rpc
-import criu_api
-import p_haul_type
-
-# Constants for iterations management
-#
-# Maximum number of iterations
-phaul_iter_max = 8
-# If we dump less than this amount of pages we abort
-# iterations and go do the full dump
-phaul_iter_min_size = 64
-# Each iteration should dump less pages or at most
-# this % more than previous
-phaul_iter_grow_max = 10
-
-class phaul_iter_worker:
-	def __init__(self, p_type, host):
-		self._mstat = mstats.migration_stats()
-		self.iteration = 0
-		self.prev_stats = None
-
-		print "Connecting to target host"
-		self.th = xem_rpc.rpc_proxy(host)
-		self.data_sk = self.th.open_socket("datask")
-
-		print "Setting up local"
-		self.img = images.phaul_images("dmp")
-		self.criu = criu_api.criu_conn(self.data_sk)
-		self.htype = p_haul_type.get_src(p_type)
-		if not self.htype:
-			raise Exception("No htype driver found")
-
-		self.fs = self.htype.get_fs()
-		if not self.fs:
-			raise Exception("No FS driver found")
-
-		self.pid = self.htype.root_task_pid()
-		self.fs.set_target_host(host[0])
-
-		print "Setting up remote"
-		self.th.setup(p_type)
-
-
-	def make_dump_req(self, typ):
-		#
-		# Prepare generic request for (pre)dump
-		#
-
-		req = cr_rpc.criu_req()
-		req.type = typ
-		req.opts.pid = self.pid
-		req.opts.ps.fd = self.criu.mem_sk_fileno()
-		req.opts.track_mem = True
-
-		req.opts.images_dir_fd = self.img.image_dir_fd()
-		req.opts.work_dir_fd = self.img.work_dir_fd()
-		p_img = self.img.prev_image_dir()
-		if p_img:
-			req.opts.parent_img = p_img
-		if not self.fs.persistent_inodes():
-			req.opts.force_irmap = True
-
-		return req
-
-	def set_options(self, opts):
-		self.th.set_options(opts)
-		self.criu.verbose(opts["verbose"])
-		self.img.set_options(opts)
-		self.htype.set_options(opts)
-		self.__force = opts["force"]
-
-	def validate_cpu(self):
-		print "Checking CPU compatibility"
-
-		print "  `- Dumping CPU info"
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.CPUINFO_DUMP
-		req.opts.images_dir_fd = self.img.work_dir_fd()
-		req.keep_open = True
-		resp = self.criu.send_req(req)
-		if not resp.success:
-			raise Exception("Can't dump cpuinfo")
-
-		print "  `- Sending CPU info"
-		self.img.send_cpuinfo(self.th, self.data_sk)
-
-		print "  `- Checking CPU info"
-		if not self.th.check_cpuinfo():
-			raise Exception("CPUs mismatch")
-
-	def start_migration(self):
-		self._mstat.start()
-
-		if not self.__force:
-			self.validate_cpu()
-
-		print "Preliminary FS migration"
-		self.fs.set_work_dir(self.img.work_dir())
-		self.fs.start_migration()
-
-		print "Starting iterations"
-		cc = self.criu
-
-		while True:
-			print "* Iteration %d" % self.iteration
-
-			self.th.start_iter()
-			self.img.new_image_dir()
-
-			print "\tIssuing pre-dump command to service"
-
-			req = self.make_dump_req(cr_rpc.PRE_DUMP)
-			resp = cc.send_req(req)
-			if not resp.success:
-				raise Exception("Pre-dump failed")
-
-			print "\tPre-dump succeeded"
-
-			self.th.end_iter()
-
-			stats = criu_api.criu_get_dstats(self.img)
-			self._mstat.iteration(stats)
-
-			#
-			# Need to decide whether we do next iteration
-			# or stop on the existing and go do full dump
-			# and restore
-			#
-
-			print "Checking iteration progress:"
-
-			if stats.pages_written <= phaul_iter_min_size:
-				print "\t> Small dump"
-				break;
-
-			if self.prev_stats:
-				w_add = stats.pages_written - self.prev_stats.pages_written
-				w_add = w_add * 100 / self.prev_stats.pages_written
-				if w_add > phaul_iter_grow_max:
-					print "\t> Iteration grows"
-					break
-
-			if self.iteration >= phaul_iter_max:
-				print "\t> Too many iterations"
-				break
-
-			self.iteration += 1
-			self.prev_stats = stats
-			print "\t> Proceed to next iteration"
-
-			self.fs.next_iteration()
-
-		#
-		# Finish with iterations -- do full dump, send images
-		# to target host and restore from them there
-		#
-
-		print "Final dump and restore"
-
-		self.th.start_iter()
-		self.img.new_image_dir()
-
-		print "\tIssuing dump command to service"
-		req = self.make_dump_req(cr_rpc.DUMP)
-		req.opts.notify_scripts = True
-		req.opts.file_locks = True
-		req.opts.evasive_devices = True
-		req.opts.link_remap = True
-		if self.htype.can_migrate_tcp():
-			req.opts.tcp_established = True
-
-		resp = cc.send_req(req)
-		while True:
-			if resp.type != cr_rpc.NOTIFY:
-				raise Exception("Dump failed")
-
-			if resp.notify.script == "post-dump":
-				#
-				# Dump is effectively over. Now CRIU
-				# waits for us to do whatever we want
-				# and keeps the tasks frozen.
-				#
-				break
-
-			elif resp.notify.script == "network-lock":
-				self.htype.net_lock()
-			elif resp.notify.script == "network-unlock":
-				self.htype.net_unlock()
-
-			print "\t\tNotify (%s)" % resp.notify.script
-			resp = cc.ack_notify()
-
-		print "Dump complete"
-		self.th.end_iter()
-
-		#
-		# Dump is complete -- go to target node,
-		# restore them there and kill (if required)
-		# tasks on source node
-		#
-
-		print "Final FS and images sync"
-		self.fs.stop_migration()
-		self.img.sync_imgs_to_target(self.th, self.htype, self.data_sk)
-
-		print "Asking target host to restore"
-		self.th.restore_from_images()
-
-		#
-		# Ack the notify after restore -- CRIU would
-		# then terminate all tasks and send us back
-		# DUMP/success message
-		#
-
-		resp = cc.ack_notify()
-		if not resp.success:
-			raise Exception("Dump screwed up")
-
-		self.htype.umount()
-
-		stats = criu_api.criu_get_dstats(self.img)
-		self._mstat.iteration(stats)
-		self._mstat.stop(self)
-		self.img.close()
-		cc.close()
diff --git a/p_haul_lxc.py b/p_haul_lxc.py
deleted file mode 100644
index 27df698..0000000
--- a/p_haul_lxc.py
+++ /dev/null
@@ -1,166 +0,0 @@
-#
-# LinuX Containers hauler module
-#
-
-import os
-import shutil
-import p_haul_cgroup
-import util
-import fs_haul_shared
-import fs_haul_subtree
-from subprocess import Popen, PIPE
-
-name = "lxc"
-lxc_dir = "/var/lib/lxc/"
-lxc_rootfs_dir = "/usr/lib64/lxc/rootfs"
-cg_image_name = "lxccg.img"
-
-class p_haul_type:
-	def __init__(self, name):
-		self._ctname = name
-		#
-		# This list would contain (v_in, v_out, v_br) tuples where
-		# v_in is the name of veth device in CT
-		# v_out is its peer on the host
-		# v_bridge is the bridge to which thie veth is attached
-		#
-		self._veths = []
-		self._cfg = {}
-
-	def __load_ct_config(self):
-		print "Loading config file from %s" % self.__ct_config()
-
-		self._cfg = {}
-		self._veths = []
-
-		veth = None
-
-		ifd = open(self.__ct_config())
-		for line in ifd:
-			if not ("=" in line):
-				continue
-			k, v = map(lambda a: a.strip(), line.split("=", 1))
-			self._cfg[k] = v
-
-			if k == "lxc.network.type":
-				if v != "veth":
-					raise Exception("Unsupported network device type: %s", v)
-				if veth:
-					self._veths.append(veth)
-				veth = util.net_dev()
-			elif k == "lxc.network.link":
-				veth.link = v
-			elif k == "lxc.network.name":
-				veth.name = v
-			elif k == "lxc.network.veth.pair":
-				veth.pair = v
-		if veth:
-			self._veths.append(veth)
-		ifd.close()
-
-	def __apply_cg_config(self):
-		print "Applying CT configs"
-		# FIXME -- implement
-		pass
-
-	def init_src(self):
-		self._fs_mounted = True
-		self._bridged = True
-		self.__load_ct_config()
-
-	def init_dst(self):
-		self._fs_mounted = False
-		self._bridged = False
-		self.__load_ct_config()
-
-	def set_options(self, opts):
-		pass
-
-	def root_task_pid(self):
-		pid = -1;
-
-		pd = Popen(["lxc-info", "-n", self._ctname], stdout = PIPE)
-		for l in pd.stdout:
-			if l.startswith("PID:"):
-				pid = int(l.split(":")[1])
-		status = pd.wait()
-		if status:
-			raise Exception("lxc info -n %s failed: %d" %
-						(self._ctname, status))
-		if pid == -1:
-			raise Exception("CT isn't running")
-		return pid
-
-	def __ct_rootfs(self):
-		return self._cfg['lxc.rootfs']
-
-	def __ct_root(self):
-		return os.path.join(lxc_rootfs_dir, self._ctname)
-
-	def __ct_config(self):
-		return os.path.join(lxc_dir, self._ctname, "config")
-
-	#
-	# Meta-images for LXC -- container config and info about CGroups
-	#
-	def get_meta_images(self, dir):
-		cg_img = os.path.join(dir, cg_image_name)
-		p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
-		cfg_name = self.__ct_config()
-		return [ (cfg_name, "config"),
-			 (cg_img, cg_image_name) ]
-
-	def put_meta_images(self, dir):
-		print "Putting config file into %s" % lxc_dir
-
-		shutil.copy(os.path.join(dir, "config"), self.__ct_config())
-
-		# Keep this name, we'll need one in prepare_ct()
-		self.cg_img = os.path.join(dir, cg_image_name)
-
-	#
-	# Create cgroup hierarchy and put root task into it
-	# Hierarchy is unlimited, we will apply config limitations
-	# in ->restored->__apply_cg_config later
-	#
-	def prepare_ct(self, pid):
-		p_haul_cgroup.restore_hier(pid, self.cg_img)
-
-	def mount(self):
-		nroot = self.__ct_root()
-		print "Mounting CT root to %s" % nroot
-		if not os.access(nroot, os.F_OK):
-			os.makedirs(nroot)
-		os.system("mount --bind %s %s" % (self.__ct_rootfs(), nroot))
-		self._fs_mounted = True
-		return nroot
-
-	def get_fs(self):
-		return fs_haul_shared.p_haul_fs()
-
-	def restored(self, pid):
-		self.__apply_cg_config()
-
-	def net_lock(self):
-		for veth in self._veths:
-			util.ifdown(veth.pair)
-
-	def net_unlock(self):
-		for veth in self._veths:
-			util.ifup(veth.pair)
-			if veth.link and not self._bridged:
-				util.bridge_add(veth.pair, veth.link)
-
-	def can_migrate_tcp(self):
-		return True
-
-	def umount(self):
-		pass
-
-	def veths(self):
-		#
-		# Caller wants to see list of tuples with [0] being name
-		# in CT and [1] being name on host. Just return existing
-		# tuples, the [2] with bridge name wouldn't hurt
-		#
-		return self._veths
diff --git a/p_haul_ovz.py b/p_haul_ovz.py
deleted file mode 100644
index bf719d0..0000000
--- a/p_haul_ovz.py
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# OpenVZ containers hauler module
-#
-
-import os
-import p_haul_vz
-
-name = "ovz"
-vzpid_dir = "/var/lib/vzctl/vepid/"
-
-class p_haul_type(p_haul_vz.p_haul_type):
-	def __init__(self, ctid):
-		p_haul_vz.p_haul_type.__init__(self, ctid)
-
-	def root_task_pid(self):
-		with open(os.path.join(vzpid_dir, self._ctid)) as pf:
-			pid = pf.read()
-			return int(pid)
diff --git a/p_haul_pid.py b/p_haul_pid.py
deleted file mode 100644
index f0051b0..0000000
--- a/p_haul_pid.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Individual process hauler
-#
-
-import fs_haul_shared
-
-name = "pid"
-
-class p_haul_type:
-	def __init__(self, id):
-		self.pid = int(id)
-		self._pidfile = None
-
-
-	#
-	# Initialize itself for source node or destination one
-	#
-	def init_src(self):
-		pass
-	def init_dst(self):
-		pass
-
-	def set_options(self, opts):
-		self._pidfile = opts["dst_rpid"]
-		self._fs_root = opts["pid_root"]
-
-	# Report the pid of the root task of what we're
-	# goung to migrate
-	def root_task_pid(self):
-		return self.pid
-
-	# Prepare filesystem before restoring. Retrun
-	# the new root, if required. 'None' will mean
-	# that things will get restored in the current
-	# mount namespace and w/o chroot
-	def mount(self):
-		return self._fs_root
-
-	# Remove any specific FS setup
-	def umount(self):
-		pass
-
-	# Get driver for FS migration
-	def get_fs(self):
-		return fs_haul_shared.p_haul_fs()
-
-	# Get list of files which should be copied to
-	# the destination node. The dir argument is where
-	# temporary stuff can be put
-	def get_meta_images(self, dir):
-		return []
-
-	# Take your files from dir and put in whatever
-	# places are appropriate. Paths (relative) are
-	# preserved.
-	def put_meta_images(self, dir):
-		pass
-
-	# Things are started to get restored, only the
-	# first task (with pid @pid) is created.
-	def prepare_ct(self, pid):
-		pass
-
-	# Restoring done, the new top task has pid pid
-	def restored(self, pid):
-		if self._pidfile:
-			print "Writing rst pidfile"
-			open(self._pidfile, "w").writelines(["%d" % pid])
-
-	#
-	# Lock and unlock networking
-	#
-	def net_lock(self):
-		pass
-
-	def net_unlock(self):
-		pass
-
-	def can_migrate_tcp(self):
-		return False
-
-	# Get list of veth pairs if any
-	def veths(self):
-		return []
diff --git a/p_haul_service.py b/p_haul_service.py
deleted file mode 100644
index d3e19ff..0000000
--- a/p_haul_service.py
+++ /dev/null
@@ -1,154 +0,0 @@
-#
-# P.HAUL code, that helps on the target node (rpyc service)
-#
-
-import xem_rpc
-import pycriu.rpc as cr_rpc
-import images
-import criu_api
-import p_haul_type
-
-class phaul_service:
-	def on_connect(self):
-		print "Connected"
-		self.dump_iter = 0
-		self.restored = False
-		self.criu = None
-		self.data_sk = None
-		self.img = None
-		self.htype = None
-
-	def on_disconnect(self):
-		print "Disconnected"
-		if self.criu:
-			self.criu.close()
-
-		if self.data_sk:
-			self.data_sk.close()
-
-		if self.htype and not self.restored:
-			self.htype.umount()
-
-		if self.img:
-			print "Closing images"
-			if not self.restored:
-				self.img.save_images()
-			self.img.close()
-
-	def on_socket_open(self, sk, uname):
-		self.data_sk = sk
-		print "Data socket (%s) accepted" % uname
-
-	def rpc_setup(self, htype_id):
-		print "Setting up service side", htype_id
-		self.img = images.phaul_images("rst")
-		self.criu = criu_api.criu_conn(self.data_sk)
-		self.htype = p_haul_type.get_dst(htype_id)
-
-	def rpc_set_options(self, opts):
-		self.criu.verbose(opts["verbose"])
-		self.img.set_options(opts)
-		self.htype.set_options(opts)
-
-	def start_page_server(self):
-		print "Starting page server for iter %d" % self.dump_iter
-
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.PAGE_SERVER
-		req.keep_open = True
-		req.opts.ps.fd = self.criu.mem_sk_fileno()
-
-		req.opts.images_dir_fd = self.img.image_dir_fd()
-		req.opts.work_dir_fd = self.img.work_dir_fd()
-		p_img = self.img.prev_image_dir()
-		if p_img:
-			req.opts.parent_img = p_img
-
-		print "\tSending criu rpc req"
-		resp = self.criu.send_req(req)
-		if not resp.success:
-			raise Exception("Failed to start page server")
-
-		print "\tPage server started at %d" % resp.ps.pid
-
-	def rpc_start_iter(self):
-		self.dump_iter += 1
-		self.img.new_image_dir()
-		self.start_page_server()
-
-	def rpc_end_iter(self):
-		pass
-
-	def rpc_start_accept_images(self, dir_id):
-		self.img.start_accept_images(dir_id, self.data_sk)
-
-	def rpc_stop_accept_images(self):
-		self.img.stop_accept_images()
-
-	def rpc_check_cpuinfo(self):
-		print "Checking cpuinfo"
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.CPUINFO_CHECK
-		req.opts.images_dir_fd = self.img.work_dir_fd()
-		req.keep_open = True
-		resp = self.criu.send_req(req)
-		print "   `-", resp.success
-		return resp.success
-
-	def rpc_restore_from_images(self):
-		print "Restoring from images"
-		self.htype.put_meta_images(self.img.image_dir())
-
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.RESTORE
-		req.opts.images_dir_fd = self.img.image_dir_fd()
-		req.opts.work_dir_fd = self.img.work_dir_fd()
-		req.opts.notify_scripts = True
-
-		if self.htype.can_migrate_tcp():
-			req.opts.tcp_established = True
-
-		for veth in self.htype.veths():
-			v = req.opts.veths.add()
-			v.if_in = veth.name
-			v.if_out = veth.pair
-
-		nroot = self.htype.mount()
-		if nroot:
-			req.opts.root = nroot
-			print "Restore root set to %s" % req.opts.root
-
-		cc = self.criu
-		resp = cc.send_req(req)
-		while True:
-			if resp.type == cr_rpc.NOTIFY:
-				print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
-				if resp.notify.script == "setup-namespaces":
-					#
-					# At that point we have only one task
-					# living in namespaces and waiting for
-					# us to ACK the notify. Htype might want
-					# to configure namespace (external net
-					# devices) and cgroups
-					#
-					self.htype.prepare_ct(resp.notify.pid)
-				elif resp.notify.script == "network-unlock":
-					self.htype.net_unlock()
-				elif resp.notify.script == "network-lock":
-					raise Exception("Locking network on restore?")
-
-				resp = cc.ack_notify()
-				continue
-
-			if not resp.success:
-				raise Exception("Restore failed")
-
-			print "Restore succeeded"
-			break
-
-		self.htype.restored(resp.restore.pid)
-		self.restored = True
-
-	def rpc_restore_time(self):
-		stats = criu_api.criu_get_rstats(self.img)
-		return stats.restore_time
diff --git a/p_haul_type.py b/p_haul_type.py
deleted file mode 100644
index 8cf1eb4..0000000
--- a/p_haul_type.py
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Haulers' type selector
-# Types (htypes) are classes, that help hauling processes.
-# See p_haul_pid for comments of how a class should look like.
-#
-
-import p_haul_vz
-import p_haul_ovz
-import p_haul_pid
-import p_haul_lxc
-
-haul_types = {
-	p_haul_vz.name: p_haul_vz,
-	p_haul_ovz.name: p_haul_ovz,
-	p_haul_pid.name: p_haul_pid,
-	p_haul_lxc.name: p_haul_lxc,
-}
-
-def __get(id):
-	if haul_types.has_key(id[0]):
-		h_type = haul_types[id[0]]
-		return h_type.p_haul_type(id[1])
-	else:
-		print "Unknown type. Try one of", haul_types.keys()
-	 	return None
-
-def get_src(id):
-	ht = __get(id)
-	ht.init_src()
-	return ht
-
-def get_dst(id):
-	ht = __get(id)
-	ht.init_dst()
-	return ht
diff --git a/p_haul_vz.py b/p_haul_vz.py
deleted file mode 100644
index 40dc943..0000000
--- a/p_haul_vz.py
+++ /dev/null
@@ -1,189 +0,0 @@
-#
-# Virtuozzo containers hauler module
-#
-
-import os
-import shlex
-import p_haul_cgroup
-import util
-import fs_haul_shared
-import fs_haul_subtree
-
-name = "vz"
-vz_dir = "/vz"
-vzpriv_dir = "%s/private" % vz_dir
-vzroot_dir = "%s/root" % vz_dir
-vz_conf_dir = "/etc/vz/conf/"
-vz_pidfiles = "/var/lib/vzctl/vepid/"
-cg_image_name = "ovzcg.img"
-
-class p_haul_type:
-	def __init__(self, ctid):
-		self._ctid = ctid
-		#
-		# This list would contain (v_in, v_out, v_br) tuples where
-		# v_in is the name of veth device in CT
-		# v_out is its peer on the host
-		# v_bridge is the bridge to which thie veth is attached
-		#
-		self._veths = []
-		self._cfg = ""
-
-	def __load_ct_config(self, path):
-		print "Loading config file from %s" % path
-
-		with open(os.path.join(path, self.__ct_config())) as ifd:
-			self._cfg = ifd.read()
-
-		#
-		# Parse and keep veth pairs, later we will
-		# equip restore request with this data and
-		# will use it while (un)locking the network
-		#
-		config = parse_vz_config(self._cfg)
-		if "NETIF" in config:
-			v_in, v_out, v_bridge = None, None, None
-			for parm in config["NETIF"].split(","):
-				pa = parm.split("=")
-				if pa[0] == "ifname":
-					v_in = pa[1]
-				elif pa[0] == "host_ifname":
-					v_out = pa[1]
-				elif pa[0] == "bridge":
-					v_bridge = pa[1]
-			if v_in and v_out:
-				print "\tCollect %s -> %s (%s) veth" % (v_in, v_out, v_bridge)
-				self._veths.append(util.net_dev(v_in, v_out, v_bridge))
-
-	def __apply_cg_config(self):
-		print "Applying CT configs"
-		# FIXME -- implement
-		pass
-
-	def init_src(self):
-		self._fs_mounted = True
-		self._bridged = True
-		self.__load_ct_config(vz_conf_dir)
-
-	def init_dst(self):
-		self._fs_mounted = False
-		self._bridged = False
-
-	def set_options(self, opts):
-		pass
-
-	def root_task_pid(self):
-		# Expect first line of tasks file contain root pid of CT
-		path = "/sys/fs/cgroup/memory/{0}/tasks".format(self._ctid)
-		with open(path) as tasks:
-			pid = tasks.readline()
-			return int(pid)
-
-	def __ct_priv(self):
-		return "%s/%s" % (vzpriv_dir, self._ctid)
-
-	def __ct_root(self):
-		return "%s/%s" % (vzroot_dir, self._ctid)
-
-	def __ct_config(self):
-		return "%s.conf" % self._ctid
-
-	#
-	# Meta-images for OVZ -- container config and info about CGroups
-	#
-	def get_meta_images(self, path):
-		cg_img = os.path.join(path, cg_image_name)
-		p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
-		cfg_name = self.__ct_config()
-		return [ (os.path.join(vz_conf_dir, cfg_name), cfg_name), \
-			 (cg_img, cg_image_name) ]
-
-	def put_meta_images(self, path):
-		print "Putting config file into %s" % vz_conf_dir
-
-		self.__load_ct_config(path)
-		with open(os.path.join(vz_conf_dir, self.__ct_config()), "w") as ofd:
-			ofd.write(self._cfg)
-
-		# Keep this name, we'll need one in prepare_ct()
-		self.cg_img = os.path.join(path, cg_image_name)
-
-	#
-	# Create cgroup hierarchy and put root task into it
-	# Hierarchy is unlimited, we will apply config limitations
-	# in ->restored->__apply_cg_config later
-	#
-	def prepare_ct(self, pid):
-		p_haul_cgroup.restore_hier(pid, self.cg_img)
-
-	def __umount_root(self):
-		print "Umounting CT root"
-		os.system("umount %s" % self.__ct_root())
-		self._fs_mounted = False
-
-	def mount(self):
-		nroot = self.__ct_root()
-		print "Mounting CT root to %s" % nroot
-		if not os.access(nroot, os.F_OK):
-			os.makedirs(nroot)
-		os.system("mount --bind %s %s" % (self.__ct_priv(), nroot))
-		self._fs_mounted = True
-		return nroot
-
-	def umount(self):
-		if self._fs_mounted:
-			self.__umount_root()
-
-	def get_fs(self):
-		rootfs = util.path_to_fs(self.__ct_priv())
-		if not rootfs:
-			print "CT is on unknown FS"
-			return None
-
-		print "CT is on %s" % rootfs
-
-		if rootfs == "nfs":
-			return fs_haul_shared.p_haul_fs()
-		if rootfs == "ext3" or rootfs == "ext4":
-			return fs_haul_subtree.p_haul_fs(self.__ct_priv())
-
-		print "Unknown CT FS"
-		return None
-
-	def restored(self, pid):
-		print "Writing pidfile"
-		pidfile = open(os.path.join(vz_pidfiles, self._ctid), 'w')
-		pidfile.write("%d" % pid)
-		pidfile.close()
-
-		self.__apply_cg_config()
-
-	def net_lock(self):
-		for veth in self._veths:
-			util.ifdown(veth.pair)
-
-	def net_unlock(self):
-		for veth in self._veths:
-			util.ifup(veth.pair)
-			if veth.link and not self._bridged:
-				util.bridge_add(veth.pair, veth.link)
-
-	def can_migrate_tcp(self):
-		return True
-
-	def veths(self):
-		#
-		# Caller wants to see list of tuples with [0] being name
-		# in CT and [1] being name on host. Just return existing
-		# tuples, the [2] with bridge name wouldn't hurt
-		#
-		return self._veths
-
-def parse_vz_config(body):
-	""" Parse shell-like virtuozzo config file"""
-
-	config_values = dict()
-	for token in shlex.split(body, comments=True):
-		name, sep, value = token.partition("=")
-		config_values[name] = value
-	return config_values
diff --git a/phaul/__init__.py b/phaul/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/phaul/criu_api.py b/phaul/criu_api.py
new file mode 100644
index 0000000..a2f70b7
--- /dev/null
+++ b/phaul/criu_api.py
@@ -0,0 +1,109 @@
+#
+# CRIU API
+# Includes class to work with CRIU service and helpers
+#
+
+import socket
+import struct
+import os
+import util
+import subprocess
+import pycriu.rpc as cr_rpc
+import pycriu.images.stats_pb2 as crs
+
+criu_binary = "criu"
+
+req_types = {
+	cr_rpc.DUMP: "dump",
+	cr_rpc.PRE_DUMP: "pre_dump",
+	cr_rpc.PAGE_SERVER: "page_server",
+	cr_rpc.RESTORE: "restore",
+	cr_rpc.CPUINFO_DUMP: "cpuinfo-dump",
+	cr_rpc.CPUINFO_CHECK: "cpuinfo-check",
+}
+
+cpuinfo_img_name = "cpuinfo.img"
+
+def_verb = 2
+
+#
+# Connection to CRIU service
+#
+
+class criu_conn:
+	def __init__(self, mem_sk):
+		self._iter = 0
+		self.verb = def_verb
+		css = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+		util.set_cloexec(css[1])
+		print "`- Passing (ctl:%d, data:%d) pair to CRIU" % (css[0].fileno(), mem_sk.fileno())
+		self._swrk = subprocess.Popen([criu_binary, "swrk", "%d" % css[0].fileno()])
+		css[0].close()
+		self._cs = css[1]
+		self._last_req = -1
+		self._mem_fd = mem_sk.fileno()
+
+	def close(self):
+		self._cs.close()
+		self._swrk.wait()
+
+	def mem_sk_fileno(self):
+		return self._mem_fd
+
+	def verbose(self, level):
+		self.verb = level
+
+	def _recv_resp(self):
+		resp = cr_rpc.criu_resp()
+		resp.ParseFromString(self._cs.recv(1024))
+		if not resp.type in (cr_rpc.NOTIFY, self._last_req):
+			raise Exception("CRIU RPC error (%d/%d)" % (resp.type, self._last_req))
+
+		return resp
+
+	def send_req(self, req):
+		req.opts.log_level = self.verb
+		req.opts.log_file = "criu_%s.%d.log" % (req_types[req.type], self._iter)
+		self._cs.send(req.SerializeToString())
+		self._iter += 1
+		self._last_req = req.type
+
+		return self._recv_resp()
+
+	def ack_notify(self, success = True):
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.NOTIFY
+		req.notify_success = True
+		self._cs.send(req.SerializeToString())
+
+		return self._recv_resp()
+
+#
+# Helper to read CRIU-generated statistics
+#
+
+CRIU_STATS_MAGIC = 0x57093306
+
+def criu_get_stats(img, file_name):
+	s = struct.Struct("I I")
+	f = open(os.path.join(img.work_dir(), file_name))
+	#
+	# Stats file is 4 butes of magic, then 4 bytes with
+	# stats packet size
+	#
+	v = s.unpack(f.read(s.size))
+	if v[0] != CRIU_STATS_MAGIC:
+		raise Exception("Magic is %x, expect %x" % (v[0], CRIU_STATS_MAGIC))
+
+	stats = crs.stats_entry()
+	stats.ParseFromString(f.read(v[1]))
+
+	return stats
+
+def criu_get_dstats(img):
+	stats = criu_get_stats(img, "stats-dump")
+	return stats.dump
+
+def criu_get_rstats(img):
+	stats = criu_get_stats(img, "stats-restore")
+	return stats.restore
diff --git a/phaul/fs_haul_shared.py b/phaul/fs_haul_shared.py
new file mode 100644
index 0000000..54e231a
--- /dev/null
+++ b/phaul/fs_haul_shared.py
@@ -0,0 +1,28 @@
+#
+# Shared FS hauler (noop)
+#
+
+class p_haul_fs:
+	def __init__(self):
+		print "Initilized shared FS hauler"
+		pass
+
+	def set_target_host(self, thost):
+		pass
+
+	def set_work_dir(self, wdir):
+		pass
+
+	def start_migration(self):
+		pass
+
+	def next_iteration(self):
+		pass
+
+	def stop_migration(self):
+		pass
+
+	# Inode numbers do not change on this FS
+	# during migration
+	def persistent_inodes(self):
+		return True
diff --git a/phaul/fs_haul_subtree.py b/phaul/fs_haul_subtree.py
new file mode 100644
index 0000000..3181c79
--- /dev/null
+++ b/phaul/fs_haul_subtree.py
@@ -0,0 +1,50 @@
+#
+# FS haul driver, that copies the subtree from
+# one node to another using rsync. It's used in
+# legacy OpenVZ configurations.
+#
+
+import subprocess as sp
+import os
+
+rsync_log_file = "rsync.log"
+
+class p_haul_fs:
+	def __init__(self, subtree_path):
+		print "Initialized subtree FS hauler (%s)" % subtree_path
+		self.__root = subtree_path
+		pass
+
+	def set_target_host(self, thost):
+		self.__thost = thost
+
+	def set_work_dir(self, wdir):
+		self.__wdir = wdir
+
+	def __run_rsync(self):
+		logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")
+		dst = "%s:%s" % (self.__thost, os.path.dirname(self.__root))
+
+		# First rsync might be very long. Wait for it not
+		# to produce big pause between the 1st pre-dump and
+		# .stop_migration
+
+		ret = sp.call(["rsync", "-a", self.__root, dst],
+				stdout = logf, stderr = logf)
+		if ret != 0:
+			raise Exception("Rsync failed")
+
+	def start_migration(self):
+		print "Starting FS migration"
+		self.__run_rsync()
+
+	def next_iteration(self):
+		pass
+
+	def stop_migration(self):
+		print "Doing final FS sync"
+		self.__run_rsync()
+
+	# When rsync-ing FS inodes number will change
+	def persistent_inodes(self):
+		return False
diff --git a/phaul/images.py b/phaul/images.py
new file mode 100644
index 0000000..a4d465c
--- /dev/null
+++ b/phaul/images.py
@@ -0,0 +1,176 @@
+#
+# images driver for migration (without FS transfer)
+#
+
+import os
+import tempfile
+import tarfile
+import time
+import shutil
+import time
+import threading
+import util
+import criu_api
+
+def_path = "/var/local/p.haul-fs/"
+
+class opendir:
+	def __init__(self, path):
+		self._dirname = path
+		self._dirfd = os.open(path, os.O_DIRECTORY)
+		util.set_cloexec(self)
+
+	def close(self):
+		os.close(self._dirfd)
+		os._dirname = None
+		os._dirfd = -1
+
+	def name(self):
+		return self._dirname
+
+	def fileno(self):
+		return self._dirfd
+
+class untar_thread(threading.Thread):
+	def __init__(self, sk, tdir):
+		threading.Thread.__init__(self)
+		self.__sk = sk
+		self.__dir = tdir
+
+	def run(self):
+		tf = tarfile.open(mode = "r|", fileobj = self.__sk.makefile())
+		tf.extractall(self.__dir)
+		tf.close()
+
+class img_tar:
+	def __init__(self, sk, dirname):
+		self.__tf = tarfile.open(mode = "w|", fileobj = sk.makefile())
+		self.__dir = dirname
+
+	def add(self, img, path = None):
+		if not path:
+			path = os.path.join(self.__dir, img)
+
+		self.__tf.add(path, img)
+
+	def close(self):
+		self.__tf.close()
+
+class phaul_images:
+	WDIR = 1
+	IMGDIR = 2
+
+	def __init__(self, typ):
+		self.current_iter = 0
+		self.sync_time = 0.0
+		self._typ = typ
+		self._keep_on_close = False
+		self._wdir = None
+		self._current_dir = None
+
+	def save_images(self):
+		print "Keeping images"
+		self._keep_on_close = True
+
+	def set_options(self, opts):
+		self._keep_on_close = opts["keep_images"]
+
+		suf = time.strftime("-%y.%m.%d-%H.%M", time.localtime())
+		util.makedirs(opts["img_path"])
+		wdir = tempfile.mkdtemp(suf, "%s-" % self._typ, opts["img_path"])
+		self._wdir = opendir(wdir)
+		self._img_path = os.path.join(self._wdir.name(), "img")
+		os.mkdir(self._img_path)
+
+	def close(self):
+		if not self._wdir:
+			return
+
+		self._wdir.close()
+		if self._current_dir:
+			self._current_dir.close()
+
+		if not self._keep_on_close:
+			print "Removing images"
+			shutil.rmtree(self._wdir.name())
+		else:
+			print "Images are kept in %s" % self._wdir.name()
+		pass
+
+	def img_sync_time(self):
+		return self.sync_time
+
+	def new_image_dir(self):
+		if self._current_dir:
+			self._current_dir.close()
+		self.current_iter += 1
+		img_dir = "%s/%d" % (self._img_path, self.current_iter)
+		print "\tMaking directory %s" % img_dir
+		os.mkdir(img_dir)
+		self._current_dir = opendir(img_dir)
+
+	def image_dir_fd(self):
+		return self._current_dir.fileno()
+
+	def work_dir_fd(self):
+		return self._wdir.fileno()
+
+	def image_dir(self):
+		return self._current_dir.name()
+
+	def work_dir(self):
+		return self._wdir.name()
+
+	def prev_image_dir(self):
+		if self.current_iter == 1:
+			return None
+		else:
+			return "../%d" % (self.current_iter - 1)
+
+	# Images transfer
+	# Are there better ways for doing this?
+
+	def sync_imgs_to_target(self, th, htype, sock):
+		# Pre-dump doesn't generate any images (yet?)
+		# so copy only those from the top dir
+		print "Sending images to target"
+
+		start = time.time()
+		cdir = self.image_dir()
+
+		th.start_accept_images(phaul_images.IMGDIR)
+		tf = img_tar(sock, cdir)
+
+		print "\tPack"
+		for img in filter(lambda x: x.endswith(".img"), os.listdir(cdir)):
+			tf.add(img)
+
+		print "\tAdd htype images"
+		for himg in htype.get_meta_images(cdir):
+			tf.add(himg[1], himg[0])
+
+		tf.close()
+		th.stop_accept_images()
+
+		self.sync_time = time.time() - start
+
+	def send_cpuinfo(self, th, sock):
+		th.start_accept_images(phaul_images.WDIR)
+		tf = img_tar(sock, self.work_dir())
+		tf.add(criu_api.cpuinfo_img_name)
+		tf.close()
+		th.stop_accept_images()
+
+	def start_accept_images(self, dir_id, sk):
+		if dir_id == phaul_images.WDIR:
+			dirname = self.work_dir()
+		else:
+			dirname = self.image_dir()
+
+		self.__acc_tar = untar_thread(sk, dirname)
+		self.__acc_tar.start()
+		print "Started images server"
+
+	def stop_accept_images(self):
+		print "Waiting for images to unpack"
+		self.__acc_tar.join()
diff --git a/phaul/mstats.py b/phaul/mstats.py
new file mode 100644
index 0000000..f1a9c32
--- /dev/null
+++ b/phaul/mstats.py
@@ -0,0 +1,33 @@
+import time
+
+def usec2sec(usec):
+	return usec / 1000000.
+
+class migration_stats:
+	def __init__(self):
+		self._iter_fr_times = []
+		self._frozen_time = 0
+
+	def start(self):
+		self._start_time = time.time()
+
+	def stop(self, iters):
+		self._rst_time = iters.th.restore_time()
+		self._img_sync_time = iters.img.img_sync_time()
+		self._end_time = time.time()
+
+		self._print_stats()
+
+	def iteration(self, stats):
+		print "Dumped %d pages, %d skipped" % \
+				(stats.pages_written, stats.pages_skipped_parent)
+
+		self._iter_fr_times.append("%.2lf" % usec2sec(stats.frozen_time))
+		self._frozen_time += stats.frozen_time
+
+	def _print_stats(self):
+		print "Migration succeeded"
+		print "\t   total time is ~%.2lf sec" % (self._end_time - self._start_time)
+		print "\t  frozen time is ~%.2lf sec (" % usec2sec(self._frozen_time), self._iter_fr_times, ")"
+		print "\t restore time is ~%.2lf sec" % usec2sec(self._rst_time)
+		print "\timg sync time is ~%.2lf sec" % (self._img_sync_time)
diff --git a/phaul/p_haul_cgroup.py b/phaul/p_haul_cgroup.py
new file mode 100644
index 0000000..846ee62
--- /dev/null
+++ b/phaul/p_haul_cgroup.py
@@ -0,0 +1,79 @@
+#
+# CGroups manipulations for p.haul.
+#
+# FIXME Isn't it nicer to do it via libcgroup?
+#
+
+import os
+
+cg_root_dir = "/sys/fs/cgroup"
+cg_tasks_file = "tasks"
+
+def cg_line_parse(ln):
+	items = ln.split(":")
+	#
+	# If two controllers are merged tigether, we see
+	# their names comma-separated in proc. The problem
+	# is that the respective directory name in sysfsis
+	# (!) IS NOT THE SAME, controller names can go
+	# reversed.
+	#
+	# That said, we just use the first name component,
+	# in sysfs there would the respective symlink
+	#
+	cname = items[1].split(",")[0]
+	cdir = items[2]
+
+	return cname, cdir
+
+def dump_hier(pid, img):
+	print "\tSave CG for %d into %s" % (pid, img)
+	fd = open(img, "w")
+	cg = open("/proc/%d/cgroup" % pid)
+	for ln in cg:
+		cg_controller, cg_dir = cg_line_parse(ln)
+		if not cg_controller.startswith("name="):
+			fd.write("%s%s" % (cg_controller, cg_dir))
+
+	cg.close()
+	fd.close()
+
+#
+# The cpuset controller is unusable before at least one
+# cpu and memory node is set there. For restore it's OK
+# to copy parent masks into it, at the end we'll apply
+# "real" CT config
+#
+
+def cpuset_copy_parent(path, c):
+	c = "cpuset.%s" % c
+	ppath = os.path.dirname(path)
+	pfd = open(os.path.join(ppath, c))
+	cfd = open(os.path.join(path, c), "w")
+	cfd.write(pfd.read())
+	cfd.close()
+	pfd.close()
+
+def cpuset_allow_all(path):
+	cpuset_copy_parent(path, "cpus")
+	cpuset_copy_parent(path, "mems")
+
+def restore_one_controller(pid, ln):
+	cg_path = os.path.join(cg_root_dir, ln.strip())
+	print "[%s]" % cg_path
+	if not os.access(cg_path, os.F_OK):
+		os.makedirs(cg_path)
+	if ln.startswith("cpuset"):
+		cpuset_allow_all(cg_path)
+
+	tf = open(os.path.join(cg_path, cg_tasks_file), "w")
+	tf.write("%d" % pid)
+	tf.close()
+
+def restore_hier(pid, img):
+	print "\tCreate hier for %d from %s" % (pid, img)
+	fd = open(img)
+	for ln in fd:
+		restore_one_controller(pid, ln)
+	fd.close()
+	pass
diff --git a/phaul/p_haul_iters.py b/phaul/p_haul_iters.py
new file mode 100644
index 0000000..7a898da
--- /dev/null
+++ b/phaul/p_haul_iters.py
@@ -0,0 +1,232 @@
+#
+# The P.HAUL core -- the class that drives migration
+#
+
+import images
+import mstats
+import xem_rpc
+import pycriu.rpc as cr_rpc
+import criu_api
+import p_haul_type
+
+# Constants for iterations management
+#
+# Maximum number of iterations
+phaul_iter_max = 8
+# If we dump less than this amount of pages we abort
+# iterations and go do the full dump
+phaul_iter_min_size = 64
+# Each iteration should dump less pages or at most
+# this % more than previous
+phaul_iter_grow_max = 10
+
+class phaul_iter_worker:
+	def __init__(self, p_type, host):
+		self._mstat = mstats.migration_stats()
+		self.iteration = 0
+		self.prev_stats = None
+
+		print "Connecting to target host"
+		self.th = xem_rpc.rpc_proxy(host)
+		self.data_sk = self.th.open_socket("datask")
+
+		print "Setting up local"
+		self.img = images.phaul_images("dmp")
+		self.criu = criu_api.criu_conn(self.data_sk)
+		self.htype = p_haul_type.get_src(p_type)
+		if not self.htype:
+			raise Exception("No htype driver found")
+
+		self.fs = self.htype.get_fs()
+		if not self.fs:
+			raise Exception("No FS driver found")
+
+		self.pid = self.htype.root_task_pid()
+		self.fs.set_target_host(host[0])
+
+		print "Setting up remote"
+		self.th.setup(p_type)
+
+
+	def make_dump_req(self, typ):
+		#
+		# Prepare generic request for (pre)dump
+		#
+
+		req = cr_rpc.criu_req()
+		req.type = typ
+		req.opts.pid = self.pid
+		req.opts.ps.fd = self.criu.mem_sk_fileno()
+		req.opts.track_mem = True
+
+		req.opts.images_dir_fd = self.img.image_dir_fd()
+		req.opts.work_dir_fd = self.img.work_dir_fd()
+		p_img = self.img.prev_image_dir()
+		if p_img:
+			req.opts.parent_img = p_img
+		if not self.fs.persistent_inodes():
+			req.opts.force_irmap = True
+
+		return req
+
+	def set_options(self, opts):
+		self.th.set_options(opts)
+		self.criu.verbose(opts["verbose"])
+		self.img.set_options(opts)
+		self.htype.set_options(opts)
+		self.__force = opts["force"]
+
+	def validate_cpu(self):
+		print "Checking CPU compatibility"
+
+		print "  `- Dumping CPU info"
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.CPUINFO_DUMP
+		req.opts.images_dir_fd = self.img.work_dir_fd()
+		req.keep_open = True
+		resp = self.criu.send_req(req)
+		if not resp.success:
+			raise Exception("Can't dump cpuinfo")
+
+		print "  `- Sending CPU info"
+		self.img.send_cpuinfo(self.th, self.data_sk)
+
+		print "  `- Checking CPU info"
+		if not self.th.check_cpuinfo():
+			raise Exception("CPUs mismatch")
+
+	def start_migration(self):
+		self._mstat.start()
+
+		if not self.__force:
+			self.validate_cpu()
+
+		print "Preliminary FS migration"
+		self.fs.set_work_dir(self.img.work_dir())
+		self.fs.start_migration()
+
+		print "Starting iterations"
+		cc = self.criu
+
+		while True:
+			print "* Iteration %d" % self.iteration
+
+			self.th.start_iter()
+			self.img.new_image_dir()
+
+			print "\tIssuing pre-dump command to service"
+
+			req = self.make_dump_req(cr_rpc.PRE_DUMP)
+			resp = cc.send_req(req)
+			if not resp.success:
+				raise Exception("Pre-dump failed")
+
+			print "\tPre-dump succeeded"
+
+			self.th.end_iter()
+
+			stats = criu_api.criu_get_dstats(self.img)
+			self._mstat.iteration(stats)
+
+			#
+			# Need to decide whether we do next iteration
+			# or stop on the existing and go do full dump
+			# and restore
+			#
+
+			print "Checking iteration progress:"
+
+			if stats.pages_written <= phaul_iter_min_size:
+				print "\t> Small dump"
+				break;
+
+			if self.prev_stats:
+				w_add = stats.pages_written - self.prev_stats.pages_written
+				w_add = w_add * 100 / self.prev_stats.pages_written
+				if w_add > phaul_iter_grow_max:
+					print "\t> Iteration grows"
+					break
+
+			if self.iteration >= phaul_iter_max:
+				print "\t> Too many iterations"
+				break
+
+			self.iteration += 1
+			self.prev_stats = stats
+			print "\t> Proceed to next iteration"
+
+			self.fs.next_iteration()
+
+		#
+		# Finish with iterations -- do full dump, send images
+		# to target host and restore from them there
+		#
+
+		print "Final dump and restore"
+
+		self.th.start_iter()
+		self.img.new_image_dir()
+
+		print "\tIssuing dump command to service"
+		req = self.make_dump_req(cr_rpc.DUMP)
+		req.opts.notify_scripts = True
+		req.opts.file_locks = True
+		req.opts.evasive_devices = True
+		req.opts.link_remap = True
+		if self.htype.can_migrate_tcp():
+			req.opts.tcp_established = True
+
+		resp = cc.send_req(req)
+		while True:
+			if resp.type != cr_rpc.NOTIFY:
+				raise Exception("Dump failed")
+
+			if resp.notify.script == "post-dump":
+				#
+				# Dump is effectively over. Now CRIU
+				# waits for us to do whatever we want
+				# and keeps the tasks frozen.
+				#
+				break
+
+			elif resp.notify.script == "network-lock":
+				self.htype.net_lock()
+			elif resp.notify.script == "network-unlock":
+				self.htype.net_unlock()
+
+			print "\t\tNotify (%s)" % resp.notify.script
+			resp = cc.ack_notify()
+
+		print "Dump complete"
+		self.th.end_iter()
+
+		#
+		# Dump is complete -- go to target node,
+		# restore them there and kill (if required)
+		# tasks on source node
+		#
+
+		print "Final FS and images sync"
+		self.fs.stop_migration()
+		self.img.sync_imgs_to_target(self.th, self.htype, self.data_sk)
+
+		print "Asking target host to restore"
+		self.th.restore_from_images()
+
+		#
+		# Ack the notify after restore -- CRIU would
+		# then terminate all tasks and send us back
+		# DUMP/success message
+		#
+
+		resp = cc.ack_notify()
+		if not resp.success:
+			raise Exception("Dump screwed up")
+
+		self.htype.umount()
+
+		stats = criu_api.criu_get_dstats(self.img)
+		self._mstat.iteration(stats)
+		self._mstat.stop(self)
+		self.img.close()
+		cc.close()
diff --git a/phaul/p_haul_lxc.py b/phaul/p_haul_lxc.py
new file mode 100644
index 0000000..27df698
--- /dev/null
+++ b/phaul/p_haul_lxc.py
@@ -0,0 +1,166 @@
+#
+# LinuX Containers hauler module
+#
+
+import os
+import shutil
+import p_haul_cgroup
+import util
+import fs_haul_shared
+import fs_haul_subtree
+from subprocess import Popen, PIPE
+
+name = "lxc"
+lxc_dir = "/var/lib/lxc/"
+lxc_rootfs_dir = "/usr/lib64/lxc/rootfs"
+cg_image_name = "lxccg.img"
+
+class p_haul_type:
+	def __init__(self, name):
+		self._ctname = name
+		#
+		# This list would contain (v_in, v_out, v_br) tuples where
+		# v_in is the name of veth device in CT
+		# v_out is its peer on the host
+		# v_bridge is the bridge to which thie veth is attached
+		#
+		self._veths = []
+		self._cfg = {}
+
+	def __load_ct_config(self):
+		print "Loading config file from %s" % self.__ct_config()
+
+		self._cfg = {}
+		self._veths = []
+
+		veth = None
+
+		ifd = open(self.__ct_config())
+		for line in ifd:
+			if not ("=" in line):
+				continue
+			k, v = map(lambda a: a.strip(), line.split("=", 1))
+			self._cfg[k] = v
+
+			if k == "lxc.network.type":
+				if v != "veth":
+					raise Exception("Unsupported network device type: %s", v)
+				if veth:
+					self._veths.append(veth)
+				veth = util.net_dev()
+			elif k == "lxc.network.link":
+				veth.link = v
+			elif k == "lxc.network.name":
+				veth.name = v
+			elif k == "lxc.network.veth.pair":
+				veth.pair = v
+		if veth:
+			self._veths.append(veth)
+		ifd.close()
+
+	def __apply_cg_config(self):
+		print "Applying CT configs"
+		# FIXME -- implement
+		pass
+
+	def init_src(self):
+		self._fs_mounted = True
+		self._bridged = True
+		self.__load_ct_config()
+
+	def init_dst(self):
+		self._fs_mounted = False
+		self._bridged = False
+		self.__load_ct_config()
+
+	def set_options(self, opts):
+		pass
+
+	def root_task_pid(self):
+		pid = -1;
+
+		pd = Popen(["lxc-info", "-n", self._ctname], stdout = PIPE)
+		for l in pd.stdout:
+			if l.startswith("PID:"):
+				pid = int(l.split(":")[1])
+		status = pd.wait()
+		if status:
+			raise Exception("lxc info -n %s failed: %d" %
+						(self._ctname, status))
+		if pid == -1:
+			raise Exception("CT isn't running")
+		return pid
+
+	def __ct_rootfs(self):
+		return self._cfg['lxc.rootfs']
+
+	def __ct_root(self):
+		return os.path.join(lxc_rootfs_dir, self._ctname)
+
+	def __ct_config(self):
+		return os.path.join(lxc_dir, self._ctname, "config")
+
+	#
+	# Meta-images for LXC -- container config and info about CGroups
+	#
+	def get_meta_images(self, dir):
+		cg_img = os.path.join(dir, cg_image_name)
+		p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
+		cfg_name = self.__ct_config()
+		return [ (cfg_name, "config"),
+			 (cg_img, cg_image_name) ]
+
+	def put_meta_images(self, dir):
+		print "Putting config file into %s" % lxc_dir
+
+		shutil.copy(os.path.join(dir, "config"), self.__ct_config())
+
+		# Keep this name, we'll need one in prepare_ct()
+		self.cg_img = os.path.join(dir, cg_image_name)
+
+	#
+	# Create cgroup hierarchy and put root task into it
+	# Hierarchy is unlimited, we will apply config limitations
+	# in ->restored->__apply_cg_config later
+	#
+	def prepare_ct(self, pid):
+		p_haul_cgroup.restore_hier(pid, self.cg_img)
+
+	def mount(self):
+		nroot = self.__ct_root()
+		print "Mounting CT root to %s" % nroot
+		if not os.access(nroot, os.F_OK):
+			os.makedirs(nroot)
+		os.system("mount --bind %s %s" % (self.__ct_rootfs(), nroot))
+		self._fs_mounted = True
+		return nroot
+
+	def get_fs(self):
+		return fs_haul_shared.p_haul_fs()
+
+	def restored(self, pid):
+		self.__apply_cg_config()
+
+	def net_lock(self):
+		for veth in self._veths:
+			util.ifdown(veth.pair)
+
+	def net_unlock(self):
+		for veth in self._veths:
+			util.ifup(veth.pair)
+			if veth.link and not self._bridged:
+				util.bridge_add(veth.pair, veth.link)
+
+	def can_migrate_tcp(self):
+		return True
+
+	def umount(self):
+		pass
+
+	def veths(self):
+		#
+		# Caller wants to see list of tuples with [0] being name
+		# in CT and [1] being name on host. Just return existing
+		# tuples, the [2] with bridge name wouldn't hurt
+		#
+		return self._veths
diff --git a/phaul/p_haul_ovz.py b/phaul/p_haul_ovz.py
new file mode 100644
index 0000000..bf719d0
--- /dev/null
+++ b/phaul/p_haul_ovz.py
@@ -0,0 +1,18 @@
+#
+# OpenVZ containers hauler module
+#
+
+import os
+import p_haul_vz
+
+name = "ovz"
+vzpid_dir = "/var/lib/vzctl/vepid/"
+
+class p_haul_type(p_haul_vz.p_haul_type):
+	def __init__(self, ctid):
+		p_haul_vz.p_haul_type.__init__(self, ctid)
+
+	def root_task_pid(self):
+		with open(os.path.join(vzpid_dir, self._ctid)) as pf:
+			pid = pf.read()
+			return int(pid)
diff --git a/phaul/p_haul_pid.py b/phaul/p_haul_pid.py
new file mode 100644
index 0000000..f0051b0
--- /dev/null
+++ b/phaul/p_haul_pid.py
@@ -0,0 +1,84 @@
+#
+# Individual process hauler
+#
+
+import fs_haul_shared
+
+name = "pid"
+
+class p_haul_type:
+	def __init__(self, id):
+		self.pid = int(id)
+		self._pidfile = None
+
+
+	#
+	# Initialize itself for source node or destination one
+	#
+	def init_src(self):
+		pass
+	def init_dst(self):
+		pass
+
+	def set_options(self, opts):
+		self._pidfile = opts["dst_rpid"]
+		self._fs_root = opts["pid_root"]
+
+	# Report the pid of the root task of what we're
+	# goung to migrate
+	def root_task_pid(self):
+		return self.pid
+
+	# Prepare filesystem before restoring. Retrun
+	# the new root, if required. 'None' will mean
+	# that things will get restored in the current
+	# mount namespace and w/o chroot
+	def mount(self):
+		return self._fs_root
+
+	# Remove any specific FS setup
+	def umount(self):
+		pass
+
+	# Get driver for FS migration
+	def get_fs(self):
+		return fs_haul_shared.p_haul_fs()
+
+	# Get list of files which should be copied to
+	# the destination node. The dir argument is where
+	# temporary stuff can be put
+	def get_meta_images(self, dir):
+		return []
+
+	# Take your files from dir and put in whatever
+	# places are appropriate. Paths (relative) are
+	# preserved.
+	def put_meta_images(self, dir):
+		pass
+
+	# Things are started to get restored, only the
+	# first task (with pid @pid) is created.
+	def prepare_ct(self, pid):
+		pass
+
+	# Restoring done, the new top task has pid pid
+	def restored(self, pid):
+		if self._pidfile:
+			print "Writing rst pidfile"
+			open(self._pidfile, "w").writelines(["%d" % pid])
+
+	#
+	# Lock and unlock networking
+	#
+	def net_lock(self):
+		pass
+
+	def net_unlock(self):
+		pass
+
+	def can_migrate_tcp(self):
+		return False
+
+	# Get list of veth pairs if any
+	def veths(self):
+		return []
diff --git a/phaul/p_haul_service.py b/phaul/p_haul_service.py
new file mode 100644
index 0000000..d3e19ff
--- /dev/null
+++ b/phaul/p_haul_service.py
@@ -0,0 +1,154 @@
+#
+# P.HAUL code, that helps on the target node (rpyc service)
+#
+
+import xem_rpc
+import pycriu.rpc as cr_rpc
+import images
+import criu_api
+import p_haul_type
+
+class phaul_service:
+	def on_connect(self):
+		print "Connected"
+		self.dump_iter = 0
+		self.restored = False
+		self.criu = None
+		self.data_sk = None
+		self.img = None
+		self.htype = None
+
+	def on_disconnect(self):
+		print "Disconnected"
+		if self.criu:
+			self.criu.close()
+
+		if self.data_sk:
+			self.data_sk.close()
+
+		if self.htype and not self.restored:
+			self.htype.umount()
+
+		if self.img:
+			print "Closing images"
+			if not self.restored:
+				self.img.save_images()
+			self.img.close()
+
+	def on_socket_open(self, sk, uname):
+		self.data_sk = sk
+		print "Data socket (%s) accepted" % uname
+
+	def rpc_setup(self, htype_id):
+		print "Setting up service side", htype_id
+		self.img = images.phaul_images("rst")
+		self.criu = criu_api.criu_conn(self.data_sk)
+		self.htype = p_haul_type.get_dst(htype_id)
+
+	def rpc_set_options(self, opts):
+		self.criu.verbose(opts["verbose"])
+		self.img.set_options(opts)
+		self.htype.set_options(opts)
+
+	def start_page_server(self):
+		print "Starting page server for iter %d" % self.dump_iter
+
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.PAGE_SERVER
+		req.keep_open = True
+		req.opts.ps.fd = self.criu.mem_sk_fileno()
+
+		req.opts.images_dir_fd = self.img.image_dir_fd()
+		req.opts.work_dir_fd = self.img.work_dir_fd()
+		p_img = self.img.prev_image_dir()
+		if p_img:
+			req.opts.parent_img = p_img
+
+		print "\tSending criu rpc req"
+		resp = self.criu.send_req(req)
+		if not resp.success:
+			raise Exception("Failed to start page server")
+
+		print "\tPage server started at %d" % resp.ps.pid
+
+	def rpc_start_iter(self):
+		self.dump_iter += 1
+		self.img.new_image_dir()
+		self.start_page_server()
+
+	def rpc_end_iter(self):
+		pass
+
+	def rpc_start_accept_images(self, dir_id):
+		self.img.start_accept_images(dir_id, self.data_sk)
+
+	def rpc_stop_accept_images(self):
+		self.img.stop_accept_images()
+
+	def rpc_check_cpuinfo(self):
+		print "Checking cpuinfo"
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.CPUINFO_CHECK
+		req.opts.images_dir_fd = self.img.work_dir_fd()
+		req.keep_open = True
+		resp = self.criu.send_req(req)
+		print "   `-", resp.success
+		return resp.success
+
+	def rpc_restore_from_images(self):
+		print "Restoring from images"
+		self.htype.put_meta_images(self.img.image_dir())
+
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.RESTORE
+		req.opts.images_dir_fd = self.img.image_dir_fd()
+		req.opts.work_dir_fd = self.img.work_dir_fd()
+		req.opts.notify_scripts = True
+
+		if self.htype.can_migrate_tcp():
+			req.opts.tcp_established = True
+
+		for veth in self.htype.veths():
+			v = req.opts.veths.add()
+			v.if_in = veth.name
+			v.if_out = veth.pair
+
+		nroot = self.htype.mount()
+		if nroot:
+			req.opts.root = nroot
+			print "Restore root set to %s" % req.opts.root
+
+		cc = self.criu
+		resp = cc.send_req(req)
+		while True:
+			if resp.type == cr_rpc.NOTIFY:
+				print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
+				if resp.notify.script == "setup-namespaces":
+					#
+					# At that point we have only one task
+					# living in namespaces and waiting for
+					# us to ACK the notify. Htype might want
+					# to configure namespace (external net
+					# devices) and cgroups
+					#
+					self.htype.prepare_ct(resp.notify.pid)
+				elif resp.notify.script == "network-unlock":
+					self.htype.net_unlock()
+				elif resp.notify.script == "network-lock":
+					raise Exception("Locking network on restore?")
+
+				resp = cc.ack_notify()
+				continue
+
+			if not resp.success:
+				raise Exception("Restore failed")
+
+			print "Restore succeeded"
+			break
+
+		self.htype.restored(resp.restore.pid)
+		self.restored = True
+
+	def rpc_restore_time(self):
+		stats = criu_api.criu_get_rstats(self.img)
+		return stats.restore_time
diff --git a/phaul/p_haul_type.py b/phaul/p_haul_type.py
new file mode 100644
index 0000000..8cf1eb4
--- /dev/null
+++ b/phaul/p_haul_type.py
@@ -0,0 +1,35 @@
+#
+# Haulers' type selector
+# Types (htypes) are classes, that help hauling processes.
+# See p_haul_pid for comments of how a class should look like.
+#
+
+import p_haul_vz
+import p_haul_ovz
+import p_haul_pid
+import p_haul_lxc
+
+haul_types = {
+	p_haul_vz.name: p_haul_vz,
+	p_haul_ovz.name: p_haul_ovz,
+	p_haul_pid.name: p_haul_pid,
+	p_haul_lxc.name: p_haul_lxc,
+}
+
+def __get(id):
+	if haul_types.has_key(id[0]):
+		h_type = haul_types[id[0]]
+		return h_type.p_haul_type(id[1])
+	else:
+		print "Unknown type. Try one of", haul_types.keys()
+	 	return None
+
+def get_src(id):
+	ht = __get(id)
+	ht.init_src()
+	return ht
+
+def get_dst(id):
+	ht = __get(id)
+	ht.init_dst()
+	return ht
diff --git a/phaul/p_haul_vz.py b/phaul/p_haul_vz.py
new file mode 100644
index 0000000..40dc943
--- /dev/null
+++ b/phaul/p_haul_vz.py
@@ -0,0 +1,189 @@
+#
+# Virtuozzo containers hauler module
+#
+
+import os
+import shlex
+import p_haul_cgroup
+import util
+import fs_haul_shared
+import fs_haul_subtree
+
+name = "vz"
+vz_dir = "/vz"
+vzpriv_dir = "%s/private" % vz_dir
+vzroot_dir = "%s/root" % vz_dir
+vz_conf_dir = "/etc/vz/conf/"
+vz_pidfiles = "/var/lib/vzctl/vepid/"
+cg_image_name = "ovzcg.img"
+
+class p_haul_type:
+	def __init__(self, ctid):
+		self._ctid = ctid
+		#
+		# This list would contain (v_in, v_out, v_br) tuples where
+		# v_in is the name of veth device in CT
+		# v_out is its peer on the host
+		# v_bridge is the bridge to which thie veth is attached
+		#
+		self._veths = []
+		self._cfg = ""
+
+	def __load_ct_config(self, path):
+		print "Loading config file from %s" % path
+
+		with open(os.path.join(path, self.__ct_config())) as ifd:
+			self._cfg = ifd.read()
+
+		#
+		# Parse and keep veth pairs, later we will
+		# equip restore request with this data and
+		# will use it while (un)locking the network
+		#
+		config = parse_vz_config(self._cfg)
+		if "NETIF" in config:
+			v_in, v_out, v_bridge = None, None, None
+			for parm in config["NETIF"].split(","):
+				pa = parm.split("=")
+				if pa[0] == "ifname":
+					v_in = pa[1]
+				elif pa[0] == "host_ifname":
+					v_out = pa[1]
+				elif pa[0] == "bridge":
+					v_bridge = pa[1]
+			if v_in and v_out:
+				print "\tCollect %s -> %s (%s) veth" % (v_in, v_out, v_bridge)
+				self._veths.append(util.net_dev(v_in, v_out, v_bridge))
+
+	def __apply_cg_config(self):
+		print "Applying CT configs"
+		# FIXME -- implement
+		pass
+
+	def init_src(self):
+		self._fs_mounted = True
+		self._bridged = True
+		self.__load_ct_config(vz_conf_dir)
+
+	def init_dst(self):
+		self._fs_mounted = False
+		self._bridged = False
+
+	def set_options(self, opts):
+		pass
+
+	def root_task_pid(self):
+		# Expect first line of tasks file contain root pid of CT
+		path = "/sys/fs/cgroup/memory/{0}/tasks".format(self._ctid)
+		with open(path) as tasks:
+			pid = tasks.readline()
+			return int(pid)
+
+	def __ct_priv(self):
+		return "%s/%s" % (vzpriv_dir, self._ctid)
+
+	def __ct_root(self):
+		return "%s/%s" % (vzroot_dir, self._ctid)
+
+	def __ct_config(self):
+		return "%s.conf" % self._ctid
+
+	#
+	# Meta-images for OVZ -- container config and info about CGroups
+	#
+	def get_meta_images(self, path):
+		cg_img = os.path.join(path, cg_image_name)
+		p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
+		cfg_name = self.__ct_config()
+		return [ (os.path.join(vz_conf_dir, cfg_name), cfg_name), \
+			 (cg_img, cg_image_name) ]
+
+	def put_meta_images(self, path):
+		print "Putting config file into %s" % vz_conf_dir
+
+		self.__load_ct_config(path)
+		with open(os.path.join(vz_conf_dir, self.__ct_config()), "w") as ofd:
+			ofd.write(self._cfg)
+
+		# Keep this name, we'll need one in prepare_ct()
+		self.cg_img = os.path.join(path, cg_image_name)
+
+	#
+	# Create cgroup hierarchy and put root task into it
+	# Hierarchy is unlimited, we will apply config limitations
+	# in ->restored->__apply_cg_config later
+	#
+	def prepare_ct(self, pid):
+		p_haul_cgroup.restore_hier(pid, self.cg_img)
+
+	def __umount_root(self):
+		print "Umounting CT root"
+		os.system("umount %s" % self.__ct_root())
+		self._fs_mounted = False
+
+	def mount(self):
+		nroot = self.__ct_root()
+		print "Mounting CT root to %s" % nroot
+		if not os.access(nroot, os.F_OK):
+			os.makedirs(nroot)
+		os.system("mount --bind %s %s" % (self.__ct_priv(), nroot))
+		self._fs_mounted = True
+		return nroot
+
+	def umount(self):
+		if self._fs_mounted:
+			self.__umount_root()
+
+	def get_fs(self):
+		rootfs = util.path_to_fs(self.__ct_priv())
+		if not rootfs:
+			print "CT is on unknown FS"
+			return None
+
+		print "CT is on %s" % rootfs
+
+		if rootfs == "nfs":
+			return fs_haul_shared.p_haul_fs()
+		if rootfs == "ext3" or rootfs == "ext4":
+			return fs_haul_subtree.p_haul_fs(self.__ct_priv())
+
+		print "Unknown CT FS"
+		return None
+
+	def restored(self, pid):
+		print "Writing pidfile"
+		pidfile = open(os.path.join(vz_pidfiles, self._ctid), 'w')
+		pidfile.write("%d" % pid)
+		pidfile.close()
+
+		self.__apply_cg_config()
+
+	def net_lock(self):
+		for veth in self._veths:
+			util.ifdown(veth.pair)
+
+	def net_unlock(self):
+		for veth in self._veths:
+			util.ifup(veth.pair)
+			if veth.link and not self._bridged:
+				util.bridge_add(veth.pair, veth.link)
+
+	def can_migrate_tcp(self):
+		return True
+
+	def veths(self):
+		#
+		# Caller wants to see list of tuples with [0] being name
+		# in CT and [1] being name on host. Just return existing
+		# tuples, the [2] with bridge name wouldn't hurt
+		#
+		return self._veths
+
+def parse_vz_config(body):
+	""" Parse shell-like virtuozzo config file"""
+
+	config_values = dict()
+	for token in shlex.split(body, comments=True):
+		name, sep, value = token.partition("=")
+		config_values[name] = value
+	return config_values
diff --git a/phaul/util.py b/phaul/util.py
new file mode 100644
index 0000000..2f3ebb0
--- /dev/null
+++ b/phaul/util.py
@@ -0,0 +1,46 @@
+import os
+import fcntl
+import errno
+
+class net_dev:
+	def __init__(self, name=None, pair=None, link=None):
+		self.name = name
+		self.pair = pair
+		self.link = link
+
+def path_to_fs(path):
+	dev = os.stat(path)
+	dev_str = "%d:%d" % (os.major(dev.st_dev), os.minor(dev.st_dev))
+	mfd = open("/proc/self/mountinfo")
+	for ln in mfd:
+		ln_p = ln.split(None, 9)
+		if dev_str == ln_p[2]:
+			return ln_p[8]
+
+	return None
+
+def ifup(ifname):
+	print "\t\tUpping %s" % ifname
+	os.system("ip link set %s up" % ifname)
+
+def ifdown(ifname):
+	print "\t\tDowning %s" % ifname
+	os.system("ip link set %s down" % ifname)
+
+def bridge_add(ifname, brname):
+	print "\t\tAdd %s to %s" % (ifname, brname)
+	os.system("brctl addif %s %s" % (brname, ifname))
+
+def set_cloexec(sk):
+	fd = sk.fileno()
+	flags = fcntl.fcntl(sk, fcntl.F_GETFD)
+	fcntl.fcntl(sk, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+def makedirs(dirpath):
+	try:
+		os.makedirs(dirpath)
+	except OSError as er:
+		if er.errno == errno.EEXIST and os.path.isdir(dirpath):
+			pass
+		else:
+			raise
diff --git a/phaul/xem_rpc.py b/phaul/xem_rpc.py
new file mode 100644
index 0000000..ead0bf6
--- /dev/null
+++ b/phaul/xem_rpc.py
@@ -0,0 +1,197 @@
+import socket
+import select
+import threading
+import traceback
+import util
+
+rpc_port = 12345
+rpc_sk_buf = 256
+
+RPC_CMD = 1
+RPC_CALL = 2
+
+RPC_RESP = 1
+RPC_EXC = 2
+
+#
+# Client
+#
+
+class _rpc_proxy_caller:
+	def __init__(self, sk, typ, fname):
+		self._rpc_sk = sk
+		self._fn_typ = typ
+		self._fn_name = fname
+
+	def __call__(self, *args):
+		call = (self._fn_typ, self._fn_name, args)
+		raw_data = repr(call)
+		self._rpc_sk.send(raw_data)
+		raw_data = self._rpc_sk.recv(rpc_sk_buf)
+		resp = eval(raw_data)
+
+		if resp[0] == RPC_RESP:
+			return resp[1]
+		elif resp[0] == RPC_EXC:
+			print "Remote exception"
+			raise Exception(resp[1])
+		else:
+			raise Exception("Proto resp error")
+
+class rpc_proxy:
+	def __init__(self, conn, *args):
+		self._srv = conn
+		self._rpc_sk = self._make_sk()
+		util.set_cloexec(self._rpc_sk)
+		_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
+
+	def __getattr__(self, attr):
+		return _rpc_proxy_caller(self._rpc_sk, RPC_CALL, attr)
+
+	def _make_sk(self):
+		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		sk.connect(self._srv)
+		return sk
+
+	def open_socket(self, uname):
+		sk = self._make_sk()
+		host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
+		c = _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "pick_channel")
+		c(host, uname)
+		return sk
+
+
+
+#
+# Server
+#
+
+class _rpc_server_sk:
+	def __init__(self, sk):
+		self._sk = sk
+		self._master = None
+
+	def fileno(self):
+		return self._sk.fileno()
+
+	def hash_name(self):
+		return self._sk.getpeername()
+
+	def get_name(self, mgr):
+		return self.hash_name()
+
+	def work(self, mgr):
+		raw_data = self._sk.recv(rpc_sk_buf)
+		if not raw_data:
+			mgr.remove(self)
+			if self._master:
+				self._master.on_disconnect()
+			self._sk.close()
+			return
+
+		data = eval(raw_data)
+		try:
+			if data[0] == RPC_CALL:
+				if not self._master:
+					raise Exception("Proto seq error")
+
+				res = getattr(self._master, "rpc_" + data[1])(*data[2])
+			elif data[0] == RPC_CMD:
+				res = getattr(self, data[1])(mgr, *data[2])
+			else:
+				raise Exception(("Proto typ error", data[0]))
+		except Exception as e:
+			traceback.print_exc()
+			res = (RPC_EXC, e)
+		else:
+			res = (RPC_RESP, res)
+
+		raw_data = repr(res)
+		self._sk.send(raw_data)
+
+	def init_rpc(self, mgr, args):
+		util.set_cloexec(self)
+		self._master = mgr.make_master()
+		self._master.on_connect(*args)
+
+	def pick_channel(self, mgr, hash_name, uname):
+		sk = mgr.pick_sk(hash_name)
+		if sk:
+			self._master.on_socket_open(sk._sk, uname)
+
+class _rpc_server_ask:
+	def __init__(self, host):
+		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		sk.bind(host)
+		sk.listen(8)
+		self._sk = sk
+		util.set_cloexec(self)
+
+	def fileno(self):
+		return self._sk.fileno()
+
+	def work(self, mgr):
+		sk, addr = self._sk.accept()
+		mgr.add(_rpc_server_sk(sk))
+
+class _rpc_stop_fd:
+	def __init__(self, fd):
+		self._fd = fd
+
+	def fileno(self):
+		return self._fd.fileno()
+
+	def work(self, mgr):
+		mgr.stop()
+
+class _rpc_server_manager:
+	def __init__(self, srv_class, host):
+		self._srv_class = srv_class
+		self._sk_by_name = {}
+		self._poll_list = [_rpc_server_ask(host)]
+		self._alive = True
+
+	def add(self, sk):
+		self._sk_by_name[sk.hash_name()] = sk
+		self._poll_list.append(sk)
+
+	def remove(self, sk):
+		self._sk_by_name.pop(sk.hash_name())
+		self._poll_list.remove(sk)
+
+	def pick_sk(self, hash_name):
+		sk = self._sk_by_name.pop(hash_name, None)
+		if sk:
+			self._poll_list.remove(sk)
+		return sk
+
+	def make_master(self):
+		return self._srv_class()
+
+	def stop(self):
+		self._alive = False
+
+	def loop(self, stop_fd):
+		if stop_fd:
+			self._poll_list.append(_rpc_stop_fd(stop_fd))
+
+		while self._alive:
+			r, w, x = select.select(self._poll_list, [], [])
+			for sk in r:
+				sk.work(self)
+
+		print "RPC Service stops"
+
+class rpc_threaded_srv(threading.Thread):
+	def __init__(self, srv_class, host):
+		threading.Thread.__init__(self)
+		self._mgr = _rpc_server_manager(srv_class, host)
+		self._stop_fd = None
+
+	def run(self):
+		self._mgr.loop(self._stop_fd)
+
+	def get_stop_fd(self):
+		sks = socket.socketpair()
+		self._stop_fd = sks[0]
+		return sks[1]
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..e90baf4
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,9 @@
+from distutils.core import setup
+
+
+setup(name='phaul',
+      description='Tool to live-migrate containers and processes',
+      version='0.1.0',
+      license='GPLv2',
+      packages=['phaul'],
+      )
diff --git a/util.py b/util.py
deleted file mode 100644
index 2f3ebb0..0000000
--- a/util.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import os
-import fcntl
-import errno
-
-class net_dev:
-	def __init__(self, name=None, pair=None, link=None):
-		self.name = name
-		self.pair = pair
-		self.link = link
-
-def path_to_fs(path):
-	dev = os.stat(path)
-	dev_str = "%d:%d" % (os.major(dev.st_dev), os.minor(dev.st_dev))
-	mfd = open("/proc/self/mountinfo")
-	for ln in mfd:
-		ln_p = ln.split(None, 9)
-		if dev_str == ln_p[2]:
-			return ln_p[8]
-
-	return None
-
-def ifup(ifname):
-	print "\t\tUpping %s" % ifname
-	os.system("ip link set %s up" % ifname)
-
-def ifdown(ifname):
-	print "\t\tDowning %s" % ifname
-	os.system("ip link set %s down" % ifname)
-
-def bridge_add(ifname, brname):
-	print "\t\tAdd %s to %s" % (ifname, brname)
-	os.system("brctl addif %s %s" % (brname, ifname))
-
-def set_cloexec(sk):
-	fd = sk.fileno()
-	flags = fcntl.fcntl(sk, fcntl.F_GETFD)
-	fcntl.fcntl(sk, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
-
-def makedirs(dirpath):
-	try:
-		os.makedirs(dirpath)
-	except OSError as er:
-		if er.errno == errno.EEXIST and os.path.isdir(dirpath):
-			pass
-		else:
-			raise
diff --git a/xem_rpc.py b/xem_rpc.py
deleted file mode 100644
index ead0bf6..0000000
--- a/xem_rpc.py
+++ /dev/null
@@ -1,197 +0,0 @@
-import socket
-import select
-import threading
-import traceback
-import util
-
-rpc_port = 12345
-rpc_sk_buf = 256
-
-RPC_CMD = 1
-RPC_CALL = 2
-
-RPC_RESP = 1
-RPC_EXC = 2
-
-#
-# Client
-#
-
-class _rpc_proxy_caller:
-	def __init__(self, sk, typ, fname):
-		self._rpc_sk = sk
-		self._fn_typ = typ
-		self._fn_name = fname
-
-	def __call__(self, *args):
-		call = (self._fn_typ, self._fn_name, args)
-		raw_data = repr(call)
-		self._rpc_sk.send(raw_data)
-		raw_data = self._rpc_sk.recv(rpc_sk_buf)
-		resp = eval(raw_data)
-
-		if resp[0] == RPC_RESP:
-			return resp[1]
-		elif resp[0] == RPC_EXC:
-			print "Remote exception"
-			raise Exception(resp[1])
-		else:
-			raise Exception("Proto resp error")
-
-class rpc_proxy:
-	def __init__(self, conn, *args):
-		self._srv = conn
-		self._rpc_sk = self._make_sk()
-		util.set_cloexec(self._rpc_sk)
-		_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
-
-	def __getattr__(self, attr):
-		return _rpc_proxy_caller(self._rpc_sk, RPC_CALL, attr)
-
-	def _make_sk(self):
-		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.connect(self._srv)
-		return sk
-
-	def open_socket(self, uname):
-		sk = self._make_sk()
-		host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
-		c = _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "pick_channel")
-		c(host, uname)
-		return sk
-
-
-
-#
-# Server
-#
-
-class _rpc_server_sk:
-	def __init__(self, sk):
-		self._sk = sk
-		self._master = None
-
-	def fileno(self):
-		return self._sk.fileno()
-
-	def hash_name(self):
-		return self._sk.getpeername()
-
-	def get_name(self, mgr):
-		return self.hash_name()
-
-	def work(self, mgr):
-		raw_data = self._sk.recv(rpc_sk_buf)
-		if not raw_data:
-			mgr.remove(self)
-			if self._master:
-				self._master.on_disconnect()
-			self._sk.close()
-			return
-
-		data = eval(raw_data)
-		try:
-			if data[0] == RPC_CALL:
-				if not self._master:
-					raise Exception("Proto seq error")
-
-				res = getattr(self._master, "rpc_" + data[1])(*data[2])
-			elif data[0] == RPC_CMD:
-				res = getattr(self, data[1])(mgr, *data[2])
-			else:
-				raise Exception(("Proto typ error", data[0]))
-		except Exception as e:
-			traceback.print_exc()
-			res = (RPC_EXC, e)
-		else:
-			res = (RPC_RESP, res)
-
-		raw_data = repr(res)
-		self._sk.send(raw_data)
-
-	def init_rpc(self, mgr, args):
-		util.set_cloexec(self)
-		self._master = mgr.make_master()
-		self._master.on_connect(*args)
-
-	def pick_channel(self, mgr, hash_name, uname):
-		sk = mgr.pick_sk(hash_name)
-		if sk:
-			self._master.on_socket_open(sk._sk, uname)
-
-class _rpc_server_ask:
-	def __init__(self, host):
-		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.bind(host)
-		sk.listen(8)
-		self._sk = sk
-		util.set_cloexec(self)
-
-	def fileno(self):
-		return self._sk.fileno()
-
-	def work(self, mgr):
-		sk, addr = self._sk.accept()
-		mgr.add(_rpc_server_sk(sk))
-
-class _rpc_stop_fd:
-	def __init__(self, fd):
-		self._fd = fd
-
-	def fileno(self):
-		return self._fd.fileno()
-
-	def work(self, mgr):
-		mgr.stop()
-
-class _rpc_server_manager:
-	def __init__(self, srv_class, host):
-		self._srv_class = srv_class
-		self._sk_by_name = {}
-		self._poll_list = [_rpc_server_ask(host)]
-		self._alive = True
-
-	def add(self, sk):
-		self._sk_by_name[sk.hash_name()] = sk
-		self._poll_list.append(sk)
-
-	def remove(self, sk):
-		self._sk_by_name.pop(sk.hash_name())
-		self._poll_list.remove(sk)
-
-	def pick_sk(self, hash_name):
-		sk = self._sk_by_name.pop(hash_name, None)
-		if sk:
-			self._poll_list.remove(sk)
-		return sk
-
-	def make_master(self):
-		return self._srv_class()
-
-	def stop(self):
-		self._alive = False
-
-	def loop(self, stop_fd):
-		if stop_fd:
-			self._poll_list.append(_rpc_stop_fd(stop_fd))
-
-		while self._alive:
-			r, w, x = select.select(self._poll_list, [], [])
-			for sk in r:
-				sk.work(self)
-
-		print "RPC Service stops"
-
-class rpc_threaded_srv(threading.Thread):
-	def __init__(self, srv_class, host):
-		threading.Thread.__init__(self)
-		self._mgr = _rpc_server_manager(srv_class, host)
-		self._stop_fd = None
-
-	def run(self):
-		self._mgr.loop(self._stop_fd)
-
-	def get_stop_fd(self):
-		sks = socket.socketpair()
-		self._stop_fd = sks[0]
-		return sks[1]
-- 
1.7.1



More information about the CRIU mailing list