[CRIU] [PATCH 1/3] p.haul: move auxiliary stuff into phaul package

Ruslan Kuprieiev kupruser at gmail.com
Sun Nov 2 17:02:49 PST 2014


Lets store all non-executable p.haul sources inside phaul package,
so it could be easily installed and used.

Signed-off-by: Ruslan Kuprieiev <kupruser at gmail.com>
---
 .gitignore               |   2 -
 Makefile                 |   4 +-
 criu_api.py              | 109 ----------------------
 fs_haul_shared.py        |  28 ------
 fs_haul_subtree.py       |  50 ----------
 images.py                | 176 -----------------------------------
 mstats.py                |  33 -------
 p.haul                   |   6 +-
 p.haul-service           |   4 +-
 p_haul_cgroup.py         |  79 ----------------
 p_haul_iters.py          | 232 -----------------------------------------------
 p_haul_lxc.py            | 166 ---------------------------------
 p_haul_ovz.py            | 190 --------------------------------------
 p_haul_pid.py            |  84 -----------------
 p_haul_service.py        | 154 -------------------------------
 p_haul_type.py           |  33 -------
 phaul/.gitignore         |   2 +
 phaul/__init__.py        |  17 ++++
 phaul/criu_api.py        | 109 ++++++++++++++++++++++
 phaul/fs_haul_shared.py  |  28 ++++++
 phaul/fs_haul_subtree.py |  50 ++++++++++
 phaul/images.py          | 176 +++++++++++++++++++++++++++++++++++
 phaul/mstats.py          |  33 +++++++
 phaul/p_haul_cgroup.py   |  79 ++++++++++++++++
 phaul/p_haul_iters.py    | 232 +++++++++++++++++++++++++++++++++++++++++++++++
 phaul/p_haul_lxc.py      | 166 +++++++++++++++++++++++++++++++++
 phaul/p_haul_ovz.py      | 190 ++++++++++++++++++++++++++++++++++++++
 phaul/p_haul_pid.py      |  84 +++++++++++++++++
 phaul/p_haul_service.py  | 154 +++++++++++++++++++++++++++++++
 phaul/p_haul_type.py     |  33 +++++++
 phaul/util.py            |  46 ++++++++++
 phaul/xem_rpc.py         | 198 ++++++++++++++++++++++++++++++++++++++++
 util.py                  |  46 ----------
 xem_rpc.py               | 198 ----------------------------------------
 34 files changed, 1604 insertions(+), 1587 deletions(-)
 delete mode 100644 criu_api.py
 delete mode 100644 fs_haul_shared.py
 delete mode 100644 fs_haul_subtree.py
 delete mode 100644 images.py
 delete mode 100644 mstats.py
 delete mode 100644 p_haul_cgroup.py
 delete mode 100644 p_haul_iters.py
 delete mode 100644 p_haul_lxc.py
 delete mode 100644 p_haul_ovz.py
 delete mode 100644 p_haul_pid.py
 delete mode 100644 p_haul_service.py
 delete mode 100644 p_haul_type.py
 create mode 100644 phaul/.gitignore
 create mode 100644 phaul/__init__.py
 create mode 100644 phaul/criu_api.py
 create mode 100644 phaul/fs_haul_shared.py
 create mode 100644 phaul/fs_haul_subtree.py
 create mode 100644 phaul/images.py
 create mode 100644 phaul/mstats.py
 create mode 100644 phaul/p_haul_cgroup.py
 create mode 100644 phaul/p_haul_iters.py
 create mode 100644 phaul/p_haul_lxc.py
 create mode 100644 phaul/p_haul_ovz.py
 create mode 100644 phaul/p_haul_pid.py
 create mode 100644 phaul/p_haul_service.py
 create mode 100644 phaul/p_haul_type.py
 create mode 100644 phaul/util.py
 create mode 100644 phaul/xem_rpc.py
 delete mode 100644 util.py
 delete mode 100644 xem_rpc.py

diff --git a/.gitignore b/.gitignore
index 941b808..0d20b64 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1 @@
 *.pyc
-rpc_pb2.py
-stats_pb2.py
diff --git a/Makefile b/Makefile
index 6f7bde5..4513658 100644
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,7 @@ all: rpc_pb2.py stats_pb2.py
 .PHONY: all
 
 rpc_pb2.py: rpc.proto
-	protoc --proto_path=. --python_out=. rpc.proto
+	protoc --proto_path=. --python_out=./phaul rpc.proto
 
 stats_pb2.py: stats.proto
-	protoc --proto_path=. --python_out=. stats.proto
+	protoc --proto_path=. --python_out=./phaul stats.proto
diff --git a/criu_api.py b/criu_api.py
deleted file mode 100644
index 146088a..0000000
--- a/criu_api.py
+++ /dev/null
@@ -1,109 +0,0 @@
-#
-# CRIU API
-# Includes class to work with CRIU service and helpers
-#
-
-import socket
-import struct
-import os
-import util
-import subprocess
-import rpc_pb2 as cr_rpc
-import stats_pb2 as crs
-
-criu_binary = "criu"
-
-req_types = {
-	cr_rpc.DUMP: "dump",
-	cr_rpc.PRE_DUMP: "pre_dump",
-	cr_rpc.PAGE_SERVER: "page_server",
-	cr_rpc.RESTORE: "restore",
-	cr_rpc.CPUINFO_DUMP: "cpuinfo-dump",
-	cr_rpc.CPUINFO_CHECK: "cpuinfo-check",
-}
-
-cpuinfo_img_name = "cpuinfo.img"
-
-def_verb = 2
-
-#
-# Connection to CRIU service
-#
-
-class criu_conn:
-	def __init__(self, mem_sk):
-		self._iter = 0
-		self.verb = def_verb
-		css = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
-		util.set_cloexec(css[1])
-		print "`- Passing (ctl:%d, data:%d) pair to CRIU" % (css[0].fileno(), mem_sk.fileno())
-		self._swrk = subprocess.Popen([criu_binary, "swrk", "%d" % css[0].fileno()])
-		css[0].close()
-		self._cs = css[1]
-		self._last_req = -1
-		self._mem_fd = mem_sk.fileno()
-
-	def close(self):
-		self._cs.close()
-		self._swrk.wait()
-
-	def mem_sk_fileno(self):
-		return self._mem_fd
-
-	def verbose(self, level):
-		self.verb = level
-
-	def _recv_resp(self):
-		resp = cr_rpc.criu_resp()
-		resp.ParseFromString(self._cs.recv(1024))
-		if not resp.type in (cr_rpc.NOTIFY, self._last_req):
-			raise Exception("CRIU RPC error (%d/%d)" % (resp.type, self._last_req))
-
-		return resp
-
-	def send_req(self, req):
-		req.opts.log_level = self.verb
-		req.opts.log_file = "criu_%s.%d.log" % (req_types[req.type], self._iter)
-		self._cs.send(req.SerializeToString())
-		self._iter += 1
-		self._last_req = req.type
-
-		return self._recv_resp()
-
-	def ack_notify(self, success = True):
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.NOTIFY
-		req.notify_success = True
-		self._cs.send(req.SerializeToString())
-
-		return self._recv_resp()
-
-#
-# Helper to read CRIU-generated statistics
-#
-
-CRIU_STATS_MAGIC = 0x57093306
-
-def criu_get_stats(img, file_name):
-	s = struct.Struct("I I")
-	f = open(os.path.join(img.work_dir(), file_name))
-	#
-	# Stats file is 4 butes of magic, then 4 bytes with
-	# stats packet size
-	#
-	v = s.unpack(f.read(s.size))
-	if v[0] != CRIU_STATS_MAGIC:
-		raise Exception("Magic is %x, expect %x" % (v[0], CRIU_STATS_MAGIC))
-
-	stats = crs.stats_entry()
-	stats.ParseFromString(f.read(v[1]))
-
-	return stats
-
-def criu_get_dstats(img):
-	stats = criu_get_stats(img, "stats-dump")
-	return stats.dump
-
-def criu_get_rstats(img):
-	stats = criu_get_stats(img, "stats-restore")
-	return stats.restore
diff --git a/fs_haul_shared.py b/fs_haul_shared.py
deleted file mode 100644
index 54e231a..0000000
--- a/fs_haul_shared.py
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Shared FS hauler (noop)
-#
-
-class p_haul_fs:
-	def __init__(self):
-		print "Initilized shared FS hauler"
-		pass
-
-	def set_target_host(self, thost):
-		pass
-
-	def set_work_dir(self, wdir):
-		pass
-
-	def start_migration(self):
-		pass
-
-	def next_iteration(self):
-		pass
-
-	def stop_migration(self):
-		pass
-
-	# Inode numbers do not change on this FS
-	# during migration
-	def persistent_inodes(self):
-		return True
diff --git a/fs_haul_subtree.py b/fs_haul_subtree.py
deleted file mode 100644
index 3181c79..0000000
--- a/fs_haul_subtree.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# FS haul driver, that copies the subtree from
-# one node to another using rsync. It's used in
-# legacy OpenVZ configurations.
-#
-
-import subprocess as sp
-import os
-
-rsync_log_file = "rsync.log"
-
-class p_haul_fs:
-	def __init__(self, subtree_path):
-		print "Initialized subtree FS hauler (%s)" % subtree_path
-		self.__root = subtree_path
-		pass
-
-	def set_target_host(self, thost):
-		self.__thost = thost
-
-	def set_work_dir(self, wdir):
-		self.__wdir = wdir
-
-	def __run_rsync(self):
-		logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")
-		dst = "%s:%s" % (self.__thost, os.path.dirname(self.__root))
-
-		# First rsync might be very long. Wait for it not
-		# to produce big pause between the 1st pre-dump and
-		# .stop_migration
-
-		ret = sp.call(["rsync", "-a", self.__root, dst],
-				stdout = logf, stderr = logf)
-		if ret != 0:
-			raise Exception("Rsync failed")
-
-	def start_migration(self):
-		print "Starting FS migration"
-		self.__run_rsync()
-
-	def next_iteration(self):
-		pass
-
-	def stop_migration(self):
-		print "Doing final FS sync"
-		self.__run_rsync()
-
-	# When rsync-ing FS inodes number will change
-	def persistent_inodes(self):
-		return False
diff --git a/images.py b/images.py
deleted file mode 100644
index a4d465c..0000000
--- a/images.py
+++ /dev/null
@@ -1,176 +0,0 @@
-#
-# images driver for migration (without FS transfer)
-#
-
-import os
-import tempfile
-import tarfile
-import time
-import shutil
-import time
-import threading
-import util
-import criu_api
-
-def_path = "/var/local/p.haul-fs/"
-
-class opendir:
-	def __init__(self, path):
-		self._dirname = path
-		self._dirfd = os.open(path, os.O_DIRECTORY)
-		util.set_cloexec(self)
-
-	def close(self):
-		os.close(self._dirfd)
-		os._dirname = None
-		os._dirfd = -1
-
-	def name(self):
-		return self._dirname
-
-	def fileno(self):
-		return self._dirfd
-
-class untar_thread(threading.Thread):
-	def __init__(self, sk, tdir):
-		threading.Thread.__init__(self)
-		self.__sk = sk
-		self.__dir = tdir
-
-	def run(self):
-		tf = tarfile.open(mode = "r|", fileobj = self.__sk.makefile())
-		tf.extractall(self.__dir)
-		tf.close()
-
-class img_tar:
-	def __init__(self, sk, dirname):
-		self.__tf = tarfile.open(mode = "w|", fileobj = sk.makefile())
-		self.__dir = dirname
-
-	def add(self, img, path = None):
-		if not path:
-			path = os.path.join(self.__dir, img)
-
-		self.__tf.add(path, img)
-
-	def close(self):
-		self.__tf.close()
-
-class phaul_images:
-	WDIR = 1
-	IMGDIR = 2
-
-	def __init__(self, typ):
-		self.current_iter = 0
-		self.sync_time = 0.0
-		self._typ = typ
-		self._keep_on_close = False
-		self._wdir = None
-		self._current_dir = None
-
-	def save_images(self):
-		print "Keeping images"
-		self._keep_on_close = True
-
-	def set_options(self, opts):
-		self._keep_on_close = opts["keep_images"]
-
-		suf = time.strftime("-%y.%m.%d-%H.%M", time.localtime())
-		util.makedirs(opts["img_path"])
-		wdir = tempfile.mkdtemp(suf, "%s-" % self._typ, opts["img_path"])
-		self._wdir = opendir(wdir)
-		self._img_path = os.path.join(self._wdir.name(), "img")
-		os.mkdir(self._img_path)
-
-	def close(self):
-		if not self._wdir:
-			return
-
-		self._wdir.close()
-		if self._current_dir:
-			self._current_dir.close()
-
-		if not self._keep_on_close:
-			print "Removing images"
-			shutil.rmtree(self._wdir.name())
-		else:
-			print "Images are kept in %s" % self._wdir.name()
-		pass
-
-	def img_sync_time(self):
-		return self.sync_time
-
-	def new_image_dir(self):
-		if self._current_dir:
-			self._current_dir.close()
-		self.current_iter += 1
-		img_dir = "%s/%d" % (self._img_path, self.current_iter)
-		print "\tMaking directory %s" % img_dir
-		os.mkdir(img_dir)
-		self._current_dir = opendir(img_dir)
-
-	def image_dir_fd(self):
-		return self._current_dir.fileno()
-
-	def work_dir_fd(self):
-		return self._wdir.fileno()
-
-	def image_dir(self):
-		return self._current_dir.name()
-
-	def work_dir(self):
-		return self._wdir.name()
-
-	def prev_image_dir(self):
-		if self.current_iter == 1:
-			return None
-		else:
-			return "../%d" % (self.current_iter - 1)
-
-	# Images transfer
-	# Are there better ways for doing this?
-
-	def sync_imgs_to_target(self, th, htype, sock):
-		# Pre-dump doesn't generate any images (yet?)
-		# so copy only those from the top dir
-		print "Sending images to target"
-
-		start = time.time()
-		cdir = self.image_dir()
-
-		th.start_accept_images(phaul_images.IMGDIR)
-		tf = img_tar(sock, cdir)
-
-		print "\tPack"
-		for img in filter(lambda x: x.endswith(".img"), os.listdir(cdir)):
-			tf.add(img)
-
-		print "\tAdd htype images"
-		for himg in htype.get_meta_images(cdir):
-			tf.add(himg[1], himg[0])
-
-		tf.close()
-		th.stop_accept_images()
-
-		self.sync_time = time.time() - start
-
-	def send_cpuinfo(self, th, sock):
-		th.start_accept_images(phaul_images.WDIR)
-		tf = img_tar(sock, self.work_dir())
-		tf.add(criu_api.cpuinfo_img_name)
-		tf.close()
-		th.stop_accept_images()
-
-	def start_accept_images(self, dir_id, sk):
-		if dir_id == phaul_images.WDIR:
-			dirname = self.work_dir()
-		else:
-			dirname = self.image_dir()
-
-		self.__acc_tar = untar_thread(sk, dirname)
-		self.__acc_tar.start()
-		print "Started images server"
-
-	def stop_accept_images(self):
-		print "Waiting for images to unpack"
-		self.__acc_tar.join()
diff --git a/mstats.py b/mstats.py
deleted file mode 100644
index f1a9c32..0000000
--- a/mstats.py
+++ /dev/null
@@ -1,33 +0,0 @@
-import time
-
-def usec2sec(usec):
-	return usec / 1000000.
-
-class migration_stats:
-	def __init__(self):
-		self._iter_fr_times = []
-		self._frozen_time = 0
-
-	def start(self):
-		self._start_time = time.time()
-
-	def stop(self, iters):
-		self._rst_time = iters.th.restore_time()
-		self._img_sync_time = iters.img.img_sync_time()
-		self._end_time = time.time()
-
-		self._print_stats()
-
-	def iteration(self, stats):
-		print "Dumped %d pages, %d skipped" % \
-				(stats.pages_written, stats.pages_skipped_parent)
-
-		self._iter_fr_times.append("%.2lf" % usec2sec(stats.frozen_time))
-		self._frozen_time += stats.frozen_time
-
-	def _print_stats(self):
-		print "Migration succeeded"
-		print "\t   total time is ~%.2lf sec" % (self._end_time - self._start_time)
-		print "\t  frozen time is ~%.2lf sec (" % usec2sec(self._frozen_time), self._iter_fr_times, ")"
-		print "\t restore time is ~%.2lf sec" % usec2sec(self._rst_time)
-		print "\timg sync time is ~%.2lf sec" % (self._img_sync_time)
diff --git a/p.haul b/p.haul
index 8c7f181..29b4c49 100755
--- a/p.haul
+++ b/p.haul
@@ -1,9 +1,9 @@
 #!/usr/bin/env python
 import sys
 import argparse
-import p_haul_iters as ph_iters
-import images
-import criu_api
+from phaul import p_haul_iters as ph_iters
+from phaul import images
+from phaul import criu_api
 
 # Usage idea
 # p.haul <type> <id> <destination>
diff --git a/p.haul-service b/p.haul-service
index cdeff54..1f6eccb 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -1,8 +1,8 @@
 #!/usr/bin/env python
 
 import signal
-import xem_rpc
-import p_haul_service as ph_srv
+from phaul import xem_rpc
+from phaul import p_haul_service as ph_srv
 
 if __name__ == "__main__":
 	sfd = None
diff --git a/p_haul_cgroup.py b/p_haul_cgroup.py
deleted file mode 100644
index 846ee62..0000000
--- a/p_haul_cgroup.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# CGroups manipulations for p.haul.
-#
-# FIXME Isn't it nicer to do it via libcgroup?
-#
-
-import os
-
-cg_root_dir = "/sys/fs/cgroup"
-cg_tasks_file = "tasks"
-
-def cg_line_parse(ln):
-	items = ln.split(":")
-	#
-	# If two controllers are merged tigether, we see
-	# their names comma-separated in proc. The problem
-	# is that the respective directory name in sysfsis
-	# (!) IS NOT THE SAME, controller names can go
-	# reversed.
-	#
-	# That said, we just use the first name component,
-	# in sysfs there would the respective symlink
-	#
-	cname = items[1].split(",")[0]
-	cdir = items[2]
-
-	return cname, cdir
-
-def dump_hier(pid, img):
-	print "\tSave CG for %d into %s" % (pid, img)
-	fd = open(img, "w")
-	cg = open("/proc/%d/cgroup" % pid)
-	for ln in cg:
-		cg_controller, cg_dir = cg_line_parse(ln)
-		if not cg_controller.startswith("name="):
-			fd.write("%s%s" % (cg_controller, cg_dir))
-
-	cg.close()
-	fd.close()
-
-#
-# The cpuset controller is unusable before at least one
-# cpu and memory node is set there. For restore it's OK
-# to copy parent masks into it, at the end we'll apply
-# "real" CT config
-#
-
-def cpuset_copy_parent(path, c):
-	c = "cpuset.%s" % c
-	ppath = os.path.dirname(path)
-	pfd = open(os.path.join(ppath, c))
-	cfd = open(os.path.join(path, c), "w")
-	cfd.write(pfd.read())
-	cfd.close()
-	pfd.close()
-
-def cpuset_allow_all(path):
-	cpuset_copy_parent(path, "cpus")
-	cpuset_copy_parent(path, "mems")
-
-def restore_one_controller(pid, ln):
-	cg_path = os.path.join(cg_root_dir, ln.strip())
-	print "[%s]" % cg_path
-	if not os.access(cg_path, os.F_OK):
-		os.makedirs(cg_path)
-	if ln.startswith("cpuset"):
-		cpuset_allow_all(cg_path)
-
-	tf = open(os.path.join(cg_path, cg_tasks_file), "w")
-	tf.write("%d" % pid)
-	tf.close()
-
-def restore_hier(pid, img):
-	print "\tCreate hier for %d from %s" % (pid, img)
-	fd = open(img)
-	for ln in fd:
-		restore_one_controller(pid, ln)
-	fd.close()
-	pass
diff --git a/p_haul_iters.py b/p_haul_iters.py
deleted file mode 100644
index 0463e78..0000000
--- a/p_haul_iters.py
+++ /dev/null
@@ -1,232 +0,0 @@
-#
-# The P.HAUL core -- the class that drives migration
-#
-
-import images
-import mstats
-import xem_rpc
-import rpc_pb2 as cr_rpc
-import criu_api
-import p_haul_type
-
-# Constants for iterations management
-#
-# Maximum number of iterations
-phaul_iter_max = 8
-# If we dump less than this amount of pages we abort
-# iterations and go do the full dump
-phaul_iter_min_size = 64
-# Each iteration should dump less pages or at most
-# this % more than previous
-phaul_iter_grow_max = 10
-
-class phaul_iter_worker:
-	def __init__(self, p_type, host):
-		self._mstat = mstats.migration_stats()
-		self.iteration = 0
-		self.prev_stats = None
-
-		print "Connecting to target host"
-		self.th = xem_rpc.rpc_proxy(host)
-		self.data_sk = self.th.open_socket("datask")
-
-		print "Setting up local"
-		self.img = images.phaul_images("dmp")
-		self.criu = criu_api.criu_conn(self.data_sk)
-		self.htype = p_haul_type.get_src(p_type)
-		if not self.htype:
-			raise Exception("No htype driver found")
-
-		self.fs = self.htype.get_fs()
-		if not self.fs:
-			raise Exception("No FS driver found")
-
-		self.pid = self.htype.root_task_pid()
-		self.fs.set_target_host(host)
-
-		print "Setting up remote"
-		self.th.setup(p_type)
-
-
-	def make_dump_req(self, typ):
-		#
-		# Prepare generic request for (pre)dump
-		#
-
-		req = cr_rpc.criu_req()
-		req.type = typ
-		req.opts.pid = self.pid
-		req.opts.ps.fd = self.criu.mem_sk_fileno()
-		req.opts.track_mem = True
-
-		req.opts.images_dir_fd = self.img.image_dir_fd()
-		req.opts.work_dir_fd = self.img.work_dir_fd()
-		p_img = self.img.prev_image_dir()
-		if p_img:
-			req.opts.parent_img = p_img
-		if not self.fs.persistent_inodes():
-			req.opts.force_irmap = True
-
-		return req
-
-	def set_options(self, opts):
-		self.th.set_options(opts)
-		self.criu.verbose(opts["verbose"])
-		self.img.set_options(opts)
-		self.htype.set_options(opts)
-		self.__force = opts["force"]
-
-	def validate_cpu(self):
-		print "Checking CPU compatibility"
-
-		print "  `- Dumping CPU info"
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.CPUINFO_DUMP
-		req.opts.images_dir_fd = self.img.work_dir_fd()
-		req.keep_open = True
-		resp = self.criu.send_req(req)
-		if not resp.success:
-			raise Exception("Can't dump cpuinfo")
-
-		print "  `- Sending CPU info"
-		self.img.send_cpuinfo(self.th, self.data_sk)
-
-		print "  `- Checking CPU info"
-		if not self.th.check_cpuinfo():
-			raise Exception("CPUs mismatch")
-
-	def start_migration(self):
-		self._mstat.start()
-
-		if not self.__force:
-			self.validate_cpu()
-
-		print "Preliminary FS migration"
-		self.fs.set_work_dir(self.img.work_dir())
-		self.fs.start_migration()
-
-		print "Starting iterations"
-		cc = self.criu
-
-		while True:
-			print "* Iteration %d" % self.iteration
-
-			self.th.start_iter()
-			self.img.new_image_dir()
-
-			print "\tIssuing pre-dump command to service"
-
-			req = self.make_dump_req(cr_rpc.PRE_DUMP)
-			resp = cc.send_req(req)
-			if not resp.success:
-				raise Exception("Pre-dump failed")
-
-			print "\tPre-dump succeeded"
-
-			self.th.end_iter()
-
-			stats = criu_api.criu_get_dstats(self.img)
-			self._mstat.iteration(stats)
-
-			#
-			# Need to decide whether we do next iteration
-			# or stop on the existing and go do full dump
-			# and restore
-			#
-
-			print "Checking iteration progress:"
-
-			if stats.pages_written <= phaul_iter_min_size:
-				print "\t> Small dump"
-				break;
-
-			if self.prev_stats:
-				w_add = stats.pages_written - self.prev_stats.pages_written
-				w_add = w_add * 100 / self.prev_stats.pages_written
-				if w_add > phaul_iter_grow_max:
-					print "\t> Iteration grows"
-					break
-
-			if self.iteration >= phaul_iter_max:
-				print "\t> Too many iterations"
-				break
-
-			self.iteration += 1
-			self.prev_stats = stats
-			print "\t> Proceed to next iteration"
-
-			self.fs.next_iteration()
-
-		#
-		# Finish with iterations -- do full dump, send images
-		# to target host and restore from them there
-		#
-
-		print "Final dump and restore"
-
-		self.th.start_iter()
-		self.img.new_image_dir()
-
-		print "\tIssuing dump command to service"
-		req = self.make_dump_req(cr_rpc.DUMP)
-		req.opts.notify_scripts = True
-		req.opts.file_locks = True
-		req.opts.evasive_devices = True
-		req.opts.link_remap = True
-		if self.htype.can_migrate_tcp():
-			req.opts.tcp_established = True
-
-		resp = cc.send_req(req)
-		while True:
-			if resp.type != cr_rpc.NOTIFY:
-				raise Exception("Dump failed")
-
-			if resp.notify.script == "post-dump":
-				#
-				# Dump is effectively over. Now CRIU
-				# waits for us to do whatever we want
-				# and keeps the tasks frozen.
-				#
-				break
-
-			elif resp.notify.script == "network-lock":
-				self.htype.net_lock()
-			elif resp.notify.script == "network-unlock":
-				self.htype.net_unlock()
-
-			print "\t\tNotify (%s)" % resp.notify.script
-			resp = cc.ack_notify()
-
-		print "Dump complete"
-		self.th.end_iter()
-
-		#
-		# Dump is complete -- go to target node,
-		# restore them there and kill (if required)
-		# tasks on source node
-		#
-
-		print "Final FS and images sync"
-		self.fs.stop_migration()
-		self.img.sync_imgs_to_target(self.th, self.htype, self.data_sk)
-
-		print "Asking target host to restore"
-		self.th.restore_from_images()
-
-		#
-		# Ack the notify after restore -- CRIU would
-		# then terminate all tasks and send us back
-		# DUMP/success message
-		#
-
-		resp = cc.ack_notify()
-		if not resp.success:
-			raise Exception("Dump screwed up")
-
-		self.htype.umount()
-
-		stats = criu_api.criu_get_dstats(self.img)
-		self._mstat.iteration(stats)
-		self._mstat.stop(self)
-		self.img.close()
-		cc.close()
diff --git a/p_haul_lxc.py b/p_haul_lxc.py
deleted file mode 100644
index 27df698..0000000
--- a/p_haul_lxc.py
+++ /dev/null
@@ -1,166 +0,0 @@
-#
-# LinuX Containers hauler module
-#
-
-import os
-import shutil
-import p_haul_cgroup
-import util
-import fs_haul_shared
-import fs_haul_subtree
-from subprocess import Popen, PIPE
-
-name = "lxc"
-lxc_dir = "/var/lib/lxc/"
-lxc_rootfs_dir = "/usr/lib64/lxc/rootfs"
-cg_image_name = "lxccg.img"
-
-class p_haul_type:
-	def __init__(self, name):
-		self._ctname = name
-		#
-		# This list would contain (v_in, v_out, v_br) tuples where
-		# v_in is the name of veth device in CT
-		# v_out is its peer on the host
-		# v_bridge is the bridge to which thie veth is attached
-		#
-		self._veths = []
-		self._cfg = {}
-
-	def __load_ct_config(self):
-		print "Loading config file from %s" % self.__ct_config()
-
-		self._cfg = {}
-		self._veths = []
-
-		veth = None
-
-		ifd = open(self.__ct_config())
-		for line in ifd:
-			if not ("=" in line):
-				continue
-			k, v = map(lambda a: a.strip(), line.split("=", 1))
-			self._cfg[k] = v
-
-			if k == "lxc.network.type":
-				if v != "veth":
-					raise Exception("Unsupported network device type: %s", v)
-				if veth:
-					self._veths.append(veth)
-				veth = util.net_dev()
-			elif k == "lxc.network.link":
-				veth.link = v
-			elif k == "lxc.network.name":
-				veth.name = v
-			elif k == "lxc.network.veth.pair":
-				veth.pair = v
-		if veth:
-			self._veths.append(veth)
-		ifd.close()
-
-	def __apply_cg_config(self):
-		print "Applying CT configs"
-		# FIXME -- implement
-		pass
-
-	def init_src(self):
-		self._fs_mounted = True
-		self._bridged = True
-		self.__load_ct_config()
-
-	def init_dst(self):
-		self._fs_mounted = False
-		self._bridged = False
-		self.__load_ct_config()
-
-	def set_options(self, opts):
-		pass
-
-	def root_task_pid(self):
-		pid = -1;
-
-		pd = Popen(["lxc-info", "-n", self._ctname], stdout = PIPE)
-		for l in pd.stdout:
-			if l.startswith("PID:"):
-				pid = int(l.split(":")[1])
-		status = pd.wait()
-		if status:
-			raise Exception("lxc info -n %s failed: %d" %
-						(self._ctname, status))
-		if pid == -1:
-			raise Exception("CT isn't running")
-		return pid
-
-	def __ct_rootfs(self):
-		return self._cfg['lxc.rootfs']
-
-	def __ct_root(self):
-		return os.path.join(lxc_rootfs_dir, self._ctname)
-
-	def __ct_config(self):
-		return os.path.join(lxc_dir, self._ctname, "config")
-
-	#
-	# Meta-images for LXC -- container config and info about CGroups
-	#
-	def get_meta_images(self, dir):
-		cg_img = os.path.join(dir, cg_image_name)
-		p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
-		cfg_name = self.__ct_config()
-		return [ (cfg_name, "config"),
-			 (cg_img, cg_image_name) ]
-
-	def put_meta_images(self, dir):
-		print "Putting config file into %s" % lxc_dir
-
-		shutil.copy(os.path.join(dir, "config"), self.__ct_config())
-
-		# Keep this name, we'll need one in prepare_ct()
-		self.cg_img = os.path.join(dir, cg_image_name)
-
-	#
-	# Create cgroup hierarchy and put root task into it
-	# Hierarchy is unlimited, we will apply config limitations
-	# in ->restored->__apply_cg_config later
-	#
-	def prepare_ct(self, pid):
-		p_haul_cgroup.restore_hier(pid, self.cg_img)
-
-	def mount(self):
-		nroot = self.__ct_root()
-		print "Mounting CT root to %s" % nroot
-		if not os.access(nroot, os.F_OK):
-			os.makedirs(nroot)
-		os.system("mount --bind %s %s" % (self.__ct_rootfs(), nroot))
-		self._fs_mounted = True
-		return nroot
-
-	def get_fs(self):
-		return fs_haul_shared.p_haul_fs()
-
-	def restored(self, pid):
-		self.__apply_cg_config()
-
-	def net_lock(self):
-		for veth in self._veths:
-			util.ifdown(veth.pair)
-
-	def net_unlock(self):
-		for veth in self._veths:
-			util.ifup(veth.pair)
-			if veth.link and not self._bridged:
-				util.bridge_add(veth.pair, veth.link)
-
-	def can_migrate_tcp(self):
-		return True
-
-	def umount(self):
-		pass
-
-	def veths(self):
-		#
-		# Caller wants to see list of tuples with [0] being name
-		# in CT and [1] being name on host. Just return existing
-		# tuples, the [2] with bridge name wouldn't hurt
-		#
-		return self._veths
diff --git a/p_haul_ovz.py b/p_haul_ovz.py
deleted file mode 100644
index 44ddbb5..0000000
--- a/p_haul_ovz.py
+++ /dev/null
@@ -1,190 +0,0 @@
-#
-# OpenVZ containers hauler module
-#
-
-import os
-import shutil
-import p_haul_cgroup
-import util
-import fs_haul_shared
-import fs_haul_subtree
-
-name = "ovz"
-vzpid_dir = "/var/lib/vzctl/vepid/"
-vz_dir = "/vz"
-vzpriv_dir = "%s/private" % vz_dir
-vzroot_dir = "%s/root" % vz_dir
-vz_conf_dir = "/etc/vz/conf/"
-vz_pidfiles = "/var/lib/vzctl/vepid/"
-cg_image_name = "ovzcg.img"
-
-class p_haul_type:
-	def __init__(self, ctid):
-		self._ctid = ctid
-		#
-		# This list would contain (v_in, v_out, v_br) tuples where
-		# v_in is the name of veth device in CT
-		# v_out is its peer on the host
-		# v_bridge is the bridge to which thie veth is attached
-		#
-		self._veths = []
-		self._cfg = []
-
-	def __load_ct_config(self, path):
-		print "Loading config file from %s" % path
-		ifd = open(os.path.join(path, self.__ct_config()))
-		for line in ifd:
-			self._cfg.append(line)
-
-			if line.startswith("NETIF="):
-				#
-				# Parse and keep veth pairs, later we will
-				# equip restore request with this data and
-				# will use it while (un)locking the network
-				#
-				v_in = None
-				v_out = None
-				v_bridge = None
-				vs = line.strip().split("=", 1)[1].strip("\"")
-				for parm in vs.split(","):
-					pa = parm.split("=")
-					if pa[0] == "ifname":
-						v_in = pa[1]
-					elif pa[0] == "host_ifname":
-						v_out = pa[1]
-					elif pa[0] == "bridge":
-						v_bridge = pa[1]
-
-				if v_in and v_out:
-					print "\tCollect %s -> %s (%s) veth" % (v_in, v_out, v_bridge)
-					veth = util.net_dev()
-					veth.name = v_in
-					veth.pair = v_out
-					veth.link = v_bridge
-					self._veths.append(veth)
-
-		ifd.close()
-
-	def __apply_cg_config(self):
-		print "Applying CT configs"
-		# FIXME -- implement
-		pass
-
-	def init_src(self):
-		self._fs_mounted = True
-		self._bridged = True
-		self.__load_ct_config(vz_conf_dir)
-
-	def init_dst(self):
-		self._fs_mounted = False
-		self._bridged = False
-
-	def set_options(self, opts):
-		pass
-
-	def root_task_pid(self):
-		pf = open(os.path.join(vzpid_dir, self._ctid))
-		pid = pf.read()
-		return int(pid)
-
-	def __ct_priv(self):
-		return "%s/%s" % (vzpriv_dir, self._ctid)
-
-	def __ct_root(self):
-		return "%s/%s" % (vzroot_dir, self._ctid)
-
-	def __ct_config(self):
-		return "%s.conf" % self._ctid
-
-	#
-	# Meta-images for OVZ -- container config and info about CGroups
-	#
-	def get_meta_images(self, path):
-		cg_img = os.path.join(path, cg_image_name)
-		p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
-		cfg_name = self.__ct_config()
-		return [ (os.path.join(vz_conf_dir, cfg_name), cfg_name), \
-			 (cg_img, cg_image_name) ]
-
-	def put_meta_images(self, path):
-		print "Putting config file into %s" % vz_conf_dir
-
-		self.__load_ct_config(path)
-		ofd = open(os.path.join(vz_conf_dir, self.__ct_config()), "w")
-		ofd.writelines(self._cfg)
-		ofd.close()
-
-		# Keep this name, we'll need one in prepare_ct()
-		self.cg_img = os.path.join(path, cg_image_name)
-
-	#
-	# Create cgroup hierarchy and put root task into it
-	# Hierarchy is unlimited, we will apply config limitations
-	# in ->restored->__apply_cg_config later
-	#
-	def prepare_ct(self, pid):
-		p_haul_cgroup.restore_hier(pid, self.cg_img)
-
-	def __umount_root(self):
-		print "Umounting CT root"
-		os.system("umount %s" % self.__ct_root())
-		self._fs_mounted = False
-
-	def mount(self):
-		nroot = self.__ct_root()
-		print "Mounting CT root to %s" % nroot
-		if not os.access(nroot, os.F_OK):
-			os.makedirs(nroot)
-		os.system("mount --bind %s %s" % (self.__ct_priv(), nroot))
-		self._fs_mounted = True
-		return nroot
-
-	def umount(self):
-		if self._fs_mounted:
-			self.__umount_root()
-
-	def get_fs(self):
-		rootfs = util.path_to_fs(self.__ct_priv())
-		if not rootfs:
-			print "CT is on unknown FS"
-			return None
-
-		print "CT is on %s" % rootfs
-
-		if rootfs == "nfs":
-			return fs_haul_shared.p_haul_fs()
-		if rootfs == "ext3" or rootfs == "ext4":
-			return fs_haul_subtree.p_haul_fs(self.__ct_priv())
-
-		print "Unknown CT FS"
-		return None
-
-	def restored(self, pid):
-		print "Writing pidfile"
-		pidfile = open(os.path.join(vz_pidfiles, self._ctid), 'w')
-		pidfile.write("%d" % pid)
-		pidfile.close()
-
-		self.__apply_cg_config()
-
-	def net_lock(self):
-		for veth in self._veths:
-			util.ifdown(veth.pair)
-
-	def net_unlock(self):
-		for veth in self._veths:
-			util.ifup(veth.pair)
-			if veth.link and not self._bridged:
-				util.bridge_add(veth.pair, veth.link)
-
-	def can_migrate_tcp(self):
-		return True
-
-	def veths(self):
-		#
-		# Caller wants to see list of tuples with [0] being name
-		# in CT and [1] being name on host. Just return existing
-		# tuples, the [2] with bridge name wouldn't hurt
-		#
-		return self._veths
-
diff --git a/p_haul_pid.py b/p_haul_pid.py
deleted file mode 100644
index f0051b0..0000000
--- a/p_haul_pid.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Individual process hauler
-#
-
-import fs_haul_shared
-
-name = "pid"
-
-class p_haul_type:
-	def __init__(self, id):
-		self.pid = int(id)
-		self._pidfile = None
-
-
-	#
-	# Initialize itself for source node or destination one
-	#
-	def init_src(self):
-		pass
-	def init_dst(self):
-		pass
-
-	def set_options(self, opts):
-		self._pidfile = opts["dst_rpid"]
-		self._fs_root = opts["pid_root"]
-
-	# Report the pid of the root task of what we're
-	# goung to migrate
-	def root_task_pid(self):
-		return self.pid
-
-	# Prepare filesystem before restoring. Retrun
-	# the new root, if required. 'None' will mean
-	# that things will get restored in the current
-	# mount namespace and w/o chroot
-	def mount(self):
-		return self._fs_root
-
-	# Remove any specific FS setup
-	def umount(self):
-		pass
-
-	# Get driver for FS migration
-	def get_fs(self):
-		return fs_haul_shared.p_haul_fs()
-
-	# Get list of files which should be copied to
-	# the destination node. The dir argument is where
-	# temporary stuff can be put
-	def get_meta_images(self, dir):
-		return []
-
-	# Take your files from dir and put in whatever
-	# places are appropriate. Paths (relative) are
-	# preserved.
-	def put_meta_images(self, dir):
-		pass
-
-	# Things are started to get restored, only the
-	# first task (with pid @pid) is created.
-	def prepare_ct(self, pid):
-		pass
-
-	# Restoring done, the new top task has pid pid
-	def restored(self, pid):
-		if self._pidfile:
-			print "Writing rst pidfile"
-			open(self._pidfile, "w").writelines(["%d" % pid])
-
-	#
-	# Lock and unlock networking
-	#
-	def net_lock(self):
-		pass
-
-	def net_unlock(self):
-		pass
-
-	def can_migrate_tcp(self):
-		return False
-
-	# Get list of veth pairs if any
-	def veths(self):
-		return []
diff --git a/p_haul_service.py b/p_haul_service.py
deleted file mode 100644
index 5587bd7..0000000
--- a/p_haul_service.py
+++ /dev/null
@@ -1,154 +0,0 @@
-#
-# P.HAUL code, that helps on the target node (rpyc service)
-#
-
-import xem_rpc
-import rpc_pb2 as cr_rpc
-import images
-import criu_api
-import p_haul_type
-
-class phaul_service:
-	def on_connect(self):
-		print "Connected"
-		self.dump_iter = 0
-		self.restored = False
-		self.criu = None
-		self.data_sk = None
-		self.img = None
-		self.htype = None
-
-	def on_disconnect(self):
-		print "Disconnected"
-		if self.criu:
-			self.criu.close()
-
-		if self.data_sk:
-			self.data_sk.close()
-
-		if self.htype and not self.restored:
-			self.htype.umount()
-
-		if self.img:
-			print "Closing images"
-			if not self.restored:
-				self.img.save_images()
-			self.img.close()
-
-	def on_socket_open(self, sk, uname):
-		self.data_sk = sk
-		print "Data socket (%s) accepted" % uname
-
-	def rpc_setup(self, htype_id):
-		print "Setting up service side", htype_id
-		self.img = images.phaul_images("rst")
-		self.criu = criu_api.criu_conn(self.data_sk)
-		self.htype = p_haul_type.get_dst(htype_id)
-
-	def rpc_set_options(self, opts):
-		self.criu.verbose(opts["verbose"])
-		self.img.set_options(opts)
-		self.htype.set_options(opts)
-
-	def start_page_server(self):
-		print "Starting page server for iter %d" % self.dump_iter
-
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.PAGE_SERVER
-		req.keep_open = True
-		req.opts.ps.fd = self.criu.mem_sk_fileno()
-
-		req.opts.images_dir_fd = self.img.image_dir_fd()
-		req.opts.work_dir_fd = self.img.work_dir_fd()
-		p_img = self.img.prev_image_dir()
-		if p_img:
-			req.opts.parent_img = p_img
-
-		print "\tSending criu rpc req"
-		resp = self.criu.send_req(req)
-		if not resp.success:
-			raise Exception("Failed to start page server")
-
-		print "\tPage server started at %d" % resp.ps.pid
-
-	def rpc_start_iter(self):
-		self.dump_iter += 1
-		self.img.new_image_dir()
-		self.start_page_server()
-
-	def rpc_end_iter(self):
-		pass
-
-	def rpc_start_accept_images(self, dir_id):
-		self.img.start_accept_images(dir_id, self.data_sk)
-
-	def rpc_stop_accept_images(self):
-		self.img.stop_accept_images()
-
-	def rpc_check_cpuinfo(self):
-		print "Checking cpuinfo"
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.CPUINFO_CHECK
-		req.opts.images_dir_fd = self.img.work_dir_fd()
-		req.keep_open = True
-		resp = self.criu.send_req(req)
-		print "   `-", resp.success
-		return resp.success
-
-	def rpc_restore_from_images(self):
-		print "Restoring from images"
-		self.htype.put_meta_images(self.img.image_dir())
-
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.RESTORE
-		req.opts.images_dir_fd = self.img.image_dir_fd()
-		req.opts.work_dir_fd = self.img.work_dir_fd()
-		req.opts.notify_scripts = True
-
-		if self.htype.can_migrate_tcp():
-			req.opts.tcp_established = True
-
-		for veth in self.htype.veths():
-			v = req.opts.veths.add()
-			v.if_in = veth.name
-			v.if_out = veth.pair
-
-		nroot = self.htype.mount()
-		if nroot:
-			req.opts.root = nroot
-			print "Restore root set to %s" % req.opts.root
-
-		cc = self.criu
-		resp = cc.send_req(req)
-		while True:
-			if resp.type == cr_rpc.NOTIFY:
-				print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
-				if resp.notify.script == "setup-namespaces":
-					#
-					# At that point we have only one task
-					# living in namespaces and waiting for
-					# us to ACK the notify. Htype might want
-					# to configure namespace (external net
-					# devices) and cgroups
-					#
-					self.htype.prepare_ct(resp.notify.pid)
-				elif resp.notify.script == "network-unlock":
-					self.htype.net_unlock()
-				elif resp.notify.script == "network-lock":
-					raise Exception("Locking network on restore?")
-
-				resp = cc.ack_notify()
-				continue
-
-			if not resp.success:
-				raise Exception("Restore failed")
-
-			print "Restore succeeded"
-			break
-
-		self.htype.restored(resp.restore.pid)
-		self.restored = True
-
-	def rpc_restore_time(self):
-		stats = criu_api.criu_get_rstats(self.img)
-		return stats.restore_time
diff --git a/p_haul_type.py b/p_haul_type.py
deleted file mode 100644
index 6ac093d..0000000
--- a/p_haul_type.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Haulers' type selector
-# Types (htypes) are classes, that help hauling processes.
-# See p_haul_pid for comments of how a class should look like.
-#
-
-import p_haul_ovz
-import p_haul_pid
-import p_haul_lxc
-
-haul_types = {
-	p_haul_ovz.name: p_haul_ovz,
-	p_haul_pid.name: p_haul_pid,
-	p_haul_lxc.name: p_haul_lxc,
-}
-
-def __get(id):
-	if haul_types.has_key(id[0]):
-		h_type = haul_types[id[0]]
-		return h_type.p_haul_type(id[1])
-	else:
-		print "Unknown type. Try one of", haul_types.keys()
-	 	return None
-
-def get_src(id):
-	ht = __get(id)
-	ht.init_src()
-	return ht
-
-def get_dst(id):
-	ht = __get(id)
-	ht.init_dst()
-	return ht
diff --git a/phaul/.gitignore b/phaul/.gitignore
new file mode 100644
index 0000000..c8c86ef
--- /dev/null
+++ b/phaul/.gitignore
@@ -0,0 +1,2 @@
+*.pyc
+*_pb2.py
diff --git a/phaul/__init__.py b/phaul/__init__.py
new file mode 100644
index 0000000..2ed0820
--- /dev/null
+++ b/phaul/__init__.py
@@ -0,0 +1,17 @@
+__all__ = [
+	"criu_api",
+	"fs_haul_shared",
+	"fs_haul_subtree",
+	"images",
+	"mstats",
+	"p_haul_cgroup",
+	"p_haul_iters",
+	"p_haul_lxc",
+	"p_haul_ovz",
+	"p_haul_pid",
+	"p_haul_service",
+	"p_haul_type",
+	"rpc_pb2",
+	"stats_pb2",
+	"util",
+	"xem_rpc"]
diff --git a/phaul/criu_api.py b/phaul/criu_api.py
new file mode 100644
index 0000000..146088a
--- /dev/null
+++ b/phaul/criu_api.py
@@ -0,0 +1,109 @@
+#
+# CRIU API
+# Includes class to work with CRIU service and helpers
+#
+
+import socket
+import struct
+import os
+import util
+import subprocess
+import rpc_pb2 as cr_rpc
+import stats_pb2 as crs
+
+criu_binary = "criu"
+
+req_types = {
+	cr_rpc.DUMP: "dump",
+	cr_rpc.PRE_DUMP: "pre_dump",
+	cr_rpc.PAGE_SERVER: "page_server",
+	cr_rpc.RESTORE: "restore",
+	cr_rpc.CPUINFO_DUMP: "cpuinfo-dump",
+	cr_rpc.CPUINFO_CHECK: "cpuinfo-check",
+}
+
+cpuinfo_img_name = "cpuinfo.img"
+
+def_verb = 2
+
+#
+# Connection to CRIU service
+#
+
+class criu_conn:
+	def __init__(self, mem_sk):
+		self._iter = 0
+		self.verb = def_verb
+		css = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+		util.set_cloexec(css[1])
+		print "`- Passing (ctl:%d, data:%d) pair to CRIU" % (css[0].fileno(), mem_sk.fileno())
+		self._swrk = subprocess.Popen([criu_binary, "swrk", "%d" % css[0].fileno()])
+		css[0].close()
+		self._cs = css[1]
+		self._last_req = -1
+		self._mem_fd = mem_sk.fileno()
+
+	def close(self):
+		self._cs.close()
+		self._swrk.wait()
+
+	def mem_sk_fileno(self):
+		return self._mem_fd
+
+	def verbose(self, level):
+		self.verb = level
+
+	def _recv_resp(self):
+		resp = cr_rpc.criu_resp()
+		resp.ParseFromString(self._cs.recv(1024))
+		if not resp.type in (cr_rpc.NOTIFY, self._last_req):
+			raise Exception("CRIU RPC error (%d/%d)" % (resp.type, self._last_req))
+
+		return resp
+
+	def send_req(self, req):
+		req.opts.log_level = self.verb
+		req.opts.log_file = "criu_%s.%d.log" % (req_types[req.type], self._iter)
+		self._cs.send(req.SerializeToString())
+		self._iter += 1
+		self._last_req = req.type
+
+		return self._recv_resp()
+
+	def ack_notify(self, success = True):
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.NOTIFY
+		req.notify_success = True
+		self._cs.send(req.SerializeToString())
+
+		return self._recv_resp()
+
+#
+# Helper to read CRIU-generated statistics
+#
+
+CRIU_STATS_MAGIC = 0x57093306
+
+def criu_get_stats(img, file_name):
+	s = struct.Struct("I I")
+	f = open(os.path.join(img.work_dir(), file_name))
+	#
+	# Stats file is 4 butes of magic, then 4 bytes with
+	# stats packet size
+	#
+	v = s.unpack(f.read(s.size))
+	if v[0] != CRIU_STATS_MAGIC:
+		raise Exception("Magic is %x, expect %x" % (v[0], CRIU_STATS_MAGIC))
+
+	stats = crs.stats_entry()
+	stats.ParseFromString(f.read(v[1]))
+
+	return stats
+
+def criu_get_dstats(img):
+	stats = criu_get_stats(img, "stats-dump")
+	return stats.dump
+
+def criu_get_rstats(img):
+	stats = criu_get_stats(img, "stats-restore")
+	return stats.restore
diff --git a/phaul/fs_haul_shared.py b/phaul/fs_haul_shared.py
new file mode 100644
index 0000000..54e231a
--- /dev/null
+++ b/phaul/fs_haul_shared.py
@@ -0,0 +1,28 @@
+#
+# Shared FS hauler (noop)
+#
+
+class p_haul_fs:
+	def __init__(self):
+		print "Initilized shared FS hauler"
+		pass
+
+	def set_target_host(self, thost):
+		pass
+
+	def set_work_dir(self, wdir):
+		pass
+
+	def start_migration(self):
+		pass
+
+	def next_iteration(self):
+		pass
+
+	def stop_migration(self):
+		pass
+
+	# Inode numbers do not change on this FS
+	# during migration
+	def persistent_inodes(self):
+		return True
diff --git a/phaul/fs_haul_subtree.py b/phaul/fs_haul_subtree.py
new file mode 100644
index 0000000..3181c79
--- /dev/null
+++ b/phaul/fs_haul_subtree.py
@@ -0,0 +1,50 @@
+#
+# FS haul driver, that copies the subtree from
+# one node to another using rsync. It's used in
+# legacy OpenVZ configurations.
+#
+
+import subprocess as sp
+import os
+
+rsync_log_file = "rsync.log"
+
+class p_haul_fs:
+	def __init__(self, subtree_path):
+		print "Initialized subtree FS hauler (%s)" % subtree_path
+		self.__root = subtree_path
+		pass
+
+	def set_target_host(self, thost):
+		self.__thost = thost
+
+	def set_work_dir(self, wdir):
+		self.__wdir = wdir
+
+	def __run_rsync(self):
+		logf = open(os.path.join(self.__wdir, rsync_log_file), "w+")
+		dst = "%s:%s" % (self.__thost, os.path.dirname(self.__root))
+
+		# First rsync might be very long. Wait for it not
+		# to produce big pause between the 1st pre-dump and
+		# .stop_migration
+
+		ret = sp.call(["rsync", "-a", self.__root, dst],
+				stdout = logf, stderr = logf)
+		if ret != 0:
+			raise Exception("Rsync failed")
+
+	def start_migration(self):
+		print "Starting FS migration"
+		self.__run_rsync()
+
+	def next_iteration(self):
+		pass
+
+	def stop_migration(self):
+		print "Doing final FS sync"
+		self.__run_rsync()
+
+	# When rsync-ing FS inodes number will change
+	def persistent_inodes(self):
+		return False
diff --git a/phaul/images.py b/phaul/images.py
new file mode 100644
index 0000000..a4d465c
--- /dev/null
+++ b/phaul/images.py
@@ -0,0 +1,176 @@
+#
+# images driver for migration (without FS transfer)
+#
+
+import os
+import tempfile
+import tarfile
+import time
+import shutil
+import time
+import threading
+import util
+import criu_api
+
+def_path = "/var/local/p.haul-fs/"
+
+class opendir:
+	def __init__(self, path):
+		self._dirname = path
+		self._dirfd = os.open(path, os.O_DIRECTORY)
+		util.set_cloexec(self)
+
+	def close(self):
+		os.close(self._dirfd)
+		os._dirname = None
+		os._dirfd = -1
+
+	def name(self):
+		return self._dirname
+
+	def fileno(self):
+		return self._dirfd
+
+class untar_thread(threading.Thread):
+	def __init__(self, sk, tdir):
+		threading.Thread.__init__(self)
+		self.__sk = sk
+		self.__dir = tdir
+
+	def run(self):
+		tf = tarfile.open(mode = "r|", fileobj = self.__sk.makefile())
+		tf.extractall(self.__dir)
+		tf.close()
+
+class img_tar:
+	def __init__(self, sk, dirname):
+		self.__tf = tarfile.open(mode = "w|", fileobj = sk.makefile())
+		self.__dir = dirname
+
+	def add(self, img, path = None):
+		if not path:
+			path = os.path.join(self.__dir, img)
+
+		self.__tf.add(path, img)
+
+	def close(self):
+		self.__tf.close()
+
+class phaul_images:
+	WDIR = 1
+	IMGDIR = 2
+
+	def __init__(self, typ):
+		self.current_iter = 0
+		self.sync_time = 0.0
+		self._typ = typ
+		self._keep_on_close = False
+		self._wdir = None
+		self._current_dir = None
+
+	def save_images(self):
+		print "Keeping images"
+		self._keep_on_close = True
+
+	def set_options(self, opts):
+		self._keep_on_close = opts["keep_images"]
+
+		suf = time.strftime("-%y.%m.%d-%H.%M", time.localtime())
+		util.makedirs(opts["img_path"])
+		wdir = tempfile.mkdtemp(suf, "%s-" % self._typ, opts["img_path"])
+		self._wdir = opendir(wdir)
+		self._img_path = os.path.join(self._wdir.name(), "img")
+		os.mkdir(self._img_path)
+
+	def close(self):
+		if not self._wdir:
+			return
+
+		self._wdir.close()
+		if self._current_dir:
+			self._current_dir.close()
+
+		if not self._keep_on_close:
+			print "Removing images"
+			shutil.rmtree(self._wdir.name())
+		else:
+			print "Images are kept in %s" % self._wdir.name()
+		pass
+
+	def img_sync_time(self):
+		return self.sync_time
+
+	def new_image_dir(self):
+		if self._current_dir:
+			self._current_dir.close()
+		self.current_iter += 1
+		img_dir = "%s/%d" % (self._img_path, self.current_iter)
+		print "\tMaking directory %s" % img_dir
+		os.mkdir(img_dir)
+		self._current_dir = opendir(img_dir)
+
+	def image_dir_fd(self):
+		return self._current_dir.fileno()
+
+	def work_dir_fd(self):
+		return self._wdir.fileno()
+
+	def image_dir(self):
+		return self._current_dir.name()
+
+	def work_dir(self):
+		return self._wdir.name()
+
+	def prev_image_dir(self):
+		if self.current_iter == 1:
+			return None
+		else:
+			return "../%d" % (self.current_iter - 1)
+
+	# Images transfer
+	# Are there better ways for doing this?
+
+	def sync_imgs_to_target(self, th, htype, sock):
+		# Pre-dump doesn't generate any images (yet?)
+		# so copy only those from the top dir
+		print "Sending images to target"
+
+		start = time.time()
+		cdir = self.image_dir()
+
+		th.start_accept_images(phaul_images.IMGDIR)
+		tf = img_tar(sock, cdir)
+
+		print "\tPack"
+		for img in filter(lambda x: x.endswith(".img"), os.listdir(cdir)):
+			tf.add(img)
+
+		print "\tAdd htype images"
+		for himg in htype.get_meta_images(cdir):
+			tf.add(himg[1], himg[0])
+
+		tf.close()
+		th.stop_accept_images()
+
+		self.sync_time = time.time() - start
+
+	def send_cpuinfo(self, th, sock):
+		th.start_accept_images(phaul_images.WDIR)
+		tf = img_tar(sock, self.work_dir())
+		tf.add(criu_api.cpuinfo_img_name)
+		tf.close()
+		th.stop_accept_images()
+
+	def start_accept_images(self, dir_id, sk):
+		if dir_id == phaul_images.WDIR:
+			dirname = self.work_dir()
+		else:
+			dirname = self.image_dir()
+
+		self.__acc_tar = untar_thread(sk, dirname)
+		self.__acc_tar.start()
+		print "Started images server"
+
+	def stop_accept_images(self):
+		print "Waiting for images to unpack"
+		self.__acc_tar.join()
diff --git a/phaul/mstats.py b/phaul/mstats.py
new file mode 100644
index 0000000..f1a9c32
--- /dev/null
+++ b/phaul/mstats.py
@@ -0,0 +1,33 @@
+import time
+
+def usec2sec(usec):
+	return usec / 1000000.
+
+class migration_stats:
+	def __init__(self):
+		self._iter_fr_times = []
+		self._frozen_time = 0
+
+	def start(self):
+		self._start_time = time.time()
+
+	def stop(self, iters):
+		self._rst_time = iters.th.restore_time()
+		self._img_sync_time = iters.img.img_sync_time()
+		self._end_time = time.time()
+
+		self._print_stats()
+
+	def iteration(self, stats):
+		print "Dumped %d pages, %d skipped" % \
+				(stats.pages_written, stats.pages_skipped_parent)
+
+		self._iter_fr_times.append("%.2lf" % usec2sec(stats.frozen_time))
+		self._frozen_time += stats.frozen_time
+
+	def _print_stats(self):
+		print "Migration succeeded"
+		print "\t   total time is ~%.2lf sec" % (self._end_time - self._start_time)
+		print "\t  frozen time is ~%.2lf sec (" % usec2sec(self._frozen_time), self._iter_fr_times, ")"
+		print "\t restore time is ~%.2lf sec" % usec2sec(self._rst_time)
+		print "\timg sync time is ~%.2lf sec" % (self._img_sync_time)
diff --git a/phaul/p_haul_cgroup.py b/phaul/p_haul_cgroup.py
new file mode 100644
index 0000000..846ee62
--- /dev/null
+++ b/phaul/p_haul_cgroup.py
@@ -0,0 +1,79 @@
+#
+# CGroups manipulations for p.haul.
+#
+# FIXME Isn't it nicer to do it via libcgroup?
+#
+
+import os
+
+cg_root_dir = "/sys/fs/cgroup"
+cg_tasks_file = "tasks"
+
+def cg_line_parse(ln):
+	items = ln.split(":")
+	#
+	# If two controllers are merged tigether, we see
+	# their names comma-separated in proc. The problem
+	# is that the respective directory name in sysfsis
+	# (!) IS NOT THE SAME, controller names can go
+	# reversed.
+	#
+	# That said, we just use the first name component,
+	# in sysfs there would the respective symlink
+	#
+	cname = items[1].split(",")[0]
+	cdir = items[2]
+
+	return cname, cdir
+
+def dump_hier(pid, img):
+	print "\tSave CG for %d into %s" % (pid, img)
+	fd = open(img, "w")
+	cg = open("/proc/%d/cgroup" % pid)
+	for ln in cg:
+		cg_controller, cg_dir = cg_line_parse(ln)
+		if not cg_controller.startswith("name="):
+			fd.write("%s%s" % (cg_controller, cg_dir))
+
+	cg.close()
+	fd.close()
+
+#
+# The cpuset controller is unusable before at least one
+# cpu and memory node is set there. For restore it's OK
+# to copy parent masks into it, at the end we'll apply
+# "real" CT config
+#
+
+def cpuset_copy_parent(path, c):
+	c = "cpuset.%s" % c
+	ppath = os.path.dirname(path)
+	pfd = open(os.path.join(ppath, c))
+	cfd = open(os.path.join(path, c), "w")
+	cfd.write(pfd.read())
+	cfd.close()
+	pfd.close()
+
+def cpuset_allow_all(path):
+	cpuset_copy_parent(path, "cpus")
+	cpuset_copy_parent(path, "mems")
+
+def restore_one_controller(pid, ln):
+	cg_path = os.path.join(cg_root_dir, ln.strip())
+	print "[%s]" % cg_path
+	if not os.access(cg_path, os.F_OK):
+		os.makedirs(cg_path)
+	if ln.startswith("cpuset"):
+		cpuset_allow_all(cg_path)
+
+	tf = open(os.path.join(cg_path, cg_tasks_file), "w")
+	tf.write("%d" % pid)
+	tf.close()
+
+def restore_hier(pid, img):
+	print "\tCreate hier for %d from %s" % (pid, img)
+	fd = open(img)
+	for ln in fd:
+		restore_one_controller(pid, ln)
+	fd.close()
+	pass
diff --git a/phaul/p_haul_iters.py b/phaul/p_haul_iters.py
new file mode 100644
index 0000000..0463e78
--- /dev/null
+++ b/phaul/p_haul_iters.py
@@ -0,0 +1,232 @@
+#
+# The P.HAUL core -- the class that drives migration
+#
+
+import images
+import mstats
+import xem_rpc
+import rpc_pb2 as cr_rpc
+import criu_api
+import p_haul_type
+
+# Constants for iterations management
+#
+# Maximum number of iterations
+phaul_iter_max = 8
+# If we dump less than this amount of pages we abort
+# iterations and go do the full dump
+phaul_iter_min_size = 64
+# Each iteration should dump less pages or at most
+# this % more than previous
+phaul_iter_grow_max = 10
+
+class phaul_iter_worker:
+	def __init__(self, p_type, host):
+		self._mstat = mstats.migration_stats()
+		self.iteration = 0
+		self.prev_stats = None
+
+		print "Connecting to target host"
+		self.th = xem_rpc.rpc_proxy(host)
+		self.data_sk = self.th.open_socket("datask")
+
+		print "Setting up local"
+		self.img = images.phaul_images("dmp")
+		self.criu = criu_api.criu_conn(self.data_sk)
+		self.htype = p_haul_type.get_src(p_type)
+		if not self.htype:
+			raise Exception("No htype driver found")
+
+		self.fs = self.htype.get_fs()
+		if not self.fs:
+			raise Exception("No FS driver found")
+
+		self.pid = self.htype.root_task_pid()
+		self.fs.set_target_host(host)
+
+		print "Setting up remote"
+		self.th.setup(p_type)
+
+
+	def make_dump_req(self, typ):
+		#
+		# Prepare generic request for (pre)dump
+		#
+
+		req = cr_rpc.criu_req()
+		req.type = typ
+		req.opts.pid = self.pid
+		req.opts.ps.fd = self.criu.mem_sk_fileno()
+		req.opts.track_mem = True
+
+		req.opts.images_dir_fd = self.img.image_dir_fd()
+		req.opts.work_dir_fd = self.img.work_dir_fd()
+		p_img = self.img.prev_image_dir()
+		if p_img:
+			req.opts.parent_img = p_img
+		if not self.fs.persistent_inodes():
+			req.opts.force_irmap = True
+
+		return req
+
+	def set_options(self, opts):
+		self.th.set_options(opts)
+		self.criu.verbose(opts["verbose"])
+		self.img.set_options(opts)
+		self.htype.set_options(opts)
+		self.__force = opts["force"]
+
+	def validate_cpu(self):
+		print "Checking CPU compatibility"
+
+		print "  `- Dumping CPU info"
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.CPUINFO_DUMP
+		req.opts.images_dir_fd = self.img.work_dir_fd()
+		req.keep_open = True
+		resp = self.criu.send_req(req)
+		if not resp.success:
+			raise Exception("Can't dump cpuinfo")
+
+		print "  `- Sending CPU info"
+		self.img.send_cpuinfo(self.th, self.data_sk)
+
+		print "  `- Checking CPU info"
+		if not self.th.check_cpuinfo():
+			raise Exception("CPUs mismatch")
+
+	def start_migration(self):
+		self._mstat.start()
+
+		if not self.__force:
+			self.validate_cpu()
+
+		print "Preliminary FS migration"
+		self.fs.set_work_dir(self.img.work_dir())
+		self.fs.start_migration()
+
+		print "Starting iterations"
+		cc = self.criu
+
+		while True:
+			print "* Iteration %d" % self.iteration
+
+			self.th.start_iter()
+			self.img.new_image_dir()
+
+			print "\tIssuing pre-dump command to service"
+
+			req = self.make_dump_req(cr_rpc.PRE_DUMP)
+			resp = cc.send_req(req)
+			if not resp.success:
+				raise Exception("Pre-dump failed")
+
+			print "\tPre-dump succeeded"
+
+			self.th.end_iter()
+
+			stats = criu_api.criu_get_dstats(self.img)
+			self._mstat.iteration(stats)
+
+			#
+			# Need to decide whether we do next iteration
+			# or stop on the existing and go do full dump
+			# and restore
+			#
+
+			print "Checking iteration progress:"
+
+			if stats.pages_written <= phaul_iter_min_size:
+				print "\t> Small dump"
+				break;
+
+			if self.prev_stats:
+				w_add = stats.pages_written - self.prev_stats.pages_written
+				w_add = w_add * 100 / self.prev_stats.pages_written
+				if w_add > phaul_iter_grow_max:
+					print "\t> Iteration grows"
+					break
+
+			if self.iteration >= phaul_iter_max:
+				print "\t> Too many iterations"
+				break
+
+			self.iteration += 1
+			self.prev_stats = stats
+			print "\t> Proceed to next iteration"
+
+			self.fs.next_iteration()
+
+		#
+		# Finish with iterations -- do full dump, send images
+		# to target host and restore from them there
+		#
+
+		print "Final dump and restore"
+
+		self.th.start_iter()
+		self.img.new_image_dir()
+
+		print "\tIssuing dump command to service"
+		req = self.make_dump_req(cr_rpc.DUMP)
+		req.opts.notify_scripts = True
+		req.opts.file_locks = True
+		req.opts.evasive_devices = True
+		req.opts.link_remap = True
+		if self.htype.can_migrate_tcp():
+			req.opts.tcp_established = True
+
+		resp = cc.send_req(req)
+		while True:
+			if resp.type != cr_rpc.NOTIFY:
+				raise Exception("Dump failed")
+
+			if resp.notify.script == "post-dump":
+				#
+				# Dump is effectively over. Now CRIU
+				# waits for us to do whatever we want
+				# and keeps the tasks frozen.
+				#
+				break
+
+			elif resp.notify.script == "network-lock":
+				self.htype.net_lock()
+			elif resp.notify.script == "network-unlock":
+				self.htype.net_unlock()
+
+			print "\t\tNotify (%s)" % resp.notify.script
+			resp = cc.ack_notify()
+
+		print "Dump complete"
+		self.th.end_iter()
+
+		#
+		# Dump is complete -- go to target node,
+		# restore them there and kill (if required)
+		# tasks on source node
+		#
+
+		print "Final FS and images sync"
+		self.fs.stop_migration()
+		self.img.sync_imgs_to_target(self.th, self.htype, self.data_sk)
+
+		print "Asking target host to restore"
+		self.th.restore_from_images()
+
+		#
+		# Ack the notify after restore -- CRIU would
+		# then terminate all tasks and send us back
+		# DUMP/success message
+		#
+
+		resp = cc.ack_notify()
+		if not resp.success:
+			raise Exception("Dump screwed up")
+
+		self.htype.umount()
+
+		stats = criu_api.criu_get_dstats(self.img)
+		self._mstat.iteration(stats)
+		self._mstat.stop(self)
+		self.img.close()
+		cc.close()
diff --git a/phaul/p_haul_lxc.py b/phaul/p_haul_lxc.py
new file mode 100644
index 0000000..27df698
--- /dev/null
+++ b/phaul/p_haul_lxc.py
@@ -0,0 +1,166 @@
+#
+# LinuX Containers hauler module
+#
+
+import os
+import shutil
+import p_haul_cgroup
+import util
+import fs_haul_shared
+import fs_haul_subtree
+from subprocess import Popen, PIPE
+
+name = "lxc"
+lxc_dir = "/var/lib/lxc/"
+lxc_rootfs_dir = "/usr/lib64/lxc/rootfs"
+cg_image_name = "lxccg.img"
+
+class p_haul_type:
+	def __init__(self, name):
+		self._ctname = name
+		#
+		# This list would contain (v_in, v_out, v_br) tuples where
+		# v_in is the name of veth device in CT
+		# v_out is its peer on the host
+		# v_bridge is the bridge to which thie veth is attached
+		#
+		self._veths = []
+		self._cfg = {}
+
+	def __load_ct_config(self):
+		print "Loading config file from %s" % self.__ct_config()
+
+		self._cfg = {}
+		self._veths = []
+
+		veth = None
+
+		ifd = open(self.__ct_config())
+		for line in ifd:
+			if not ("=" in line):
+				continue
+			k, v = map(lambda a: a.strip(), line.split("=", 1))
+			self._cfg[k] = v
+
+			if k == "lxc.network.type":
+				if v != "veth":
+					raise Exception("Unsupported network device type: %s", v)
+				if veth:
+					self._veths.append(veth)
+				veth = util.net_dev()
+			elif k == "lxc.network.link":
+				veth.link = v
+			elif k == "lxc.network.name":
+				veth.name = v
+			elif k == "lxc.network.veth.pair":
+				veth.pair = v
+		if veth:
+			self._veths.append(veth)
+		ifd.close()
+
+	def __apply_cg_config(self):
+		print "Applying CT configs"
+		# FIXME -- implement
+		pass
+
+	def init_src(self):
+		self._fs_mounted = True
+		self._bridged = True
+		self.__load_ct_config()
+
+	def init_dst(self):
+		self._fs_mounted = False
+		self._bridged = False
+		self.__load_ct_config()
+
+	def set_options(self, opts):
+		pass
+
+	def root_task_pid(self):
+		pid = -1;
+
+		pd = Popen(["lxc-info", "-n", self._ctname], stdout = PIPE)
+		for l in pd.stdout:
+			if l.startswith("PID:"):
+				pid = int(l.split(":")[1])
+		status = pd.wait()
+		if status:
+			raise Exception("lxc info -n %s failed: %d" %
+						(self._ctname, status))
+		if pid == -1:
+			raise Exception("CT isn't running")
+		return pid
+
+	def __ct_rootfs(self):
+		return self._cfg['lxc.rootfs']
+
+	def __ct_root(self):
+		return os.path.join(lxc_rootfs_dir, self._ctname)
+
+	def __ct_config(self):
+		return os.path.join(lxc_dir, self._ctname, "config")
+
+	#
+	# Meta-images for LXC -- container config and info about CGroups
+	#
+	def get_meta_images(self, dir):
+		cg_img = os.path.join(dir, cg_image_name)
+		p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
+		cfg_name = self.__ct_config()
+		return [ (cfg_name, "config"),
+			 (cg_img, cg_image_name) ]
+
+	def put_meta_images(self, dir):
+		print "Putting config file into %s" % lxc_dir
+
+		shutil.copy(os.path.join(dir, "config"), self.__ct_config())
+
+		# Keep this name, we'll need one in prepare_ct()
+		self.cg_img = os.path.join(dir, cg_image_name)
+
+	#
+	# Create cgroup hierarchy and put root task into it
+	# Hierarchy is unlimited, we will apply config limitations
+	# in ->restored->__apply_cg_config later
+	#
+	def prepare_ct(self, pid):
+		p_haul_cgroup.restore_hier(pid, self.cg_img)
+
+	def mount(self):
+		nroot = self.__ct_root()
+		print "Mounting CT root to %s" % nroot
+		if not os.access(nroot, os.F_OK):
+			os.makedirs(nroot)
+		os.system("mount --bind %s %s" % (self.__ct_rootfs(), nroot))
+		self._fs_mounted = True
+		return nroot
+
+	def get_fs(self):
+		return fs_haul_shared.p_haul_fs()
+
+	def restored(self, pid):
+		self.__apply_cg_config()
+
+	def net_lock(self):
+		for veth in self._veths:
+			util.ifdown(veth.pair)
+
+	def net_unlock(self):
+		for veth in self._veths:
+			util.ifup(veth.pair)
+			if veth.link and not self._bridged:
+				util.bridge_add(veth.pair, veth.link)
+
+	def can_migrate_tcp(self):
+		return True
+
+	def umount(self):
+		pass
+
+	def veths(self):
+		#
+		# Caller wants to see list of tuples with [0] being name
+		# in CT and [1] being name on host. Just return existing
+		# tuples, the [2] with bridge name wouldn't hurt
+		#
+		return self._veths
diff --git a/phaul/p_haul_ovz.py b/phaul/p_haul_ovz.py
new file mode 100644
index 0000000..44ddbb5
--- /dev/null
+++ b/phaul/p_haul_ovz.py
@@ -0,0 +1,190 @@
+#
+# OpenVZ containers hauler module
+#
+
+import os
+import shutil
+import p_haul_cgroup
+import util
+import fs_haul_shared
+import fs_haul_subtree
+
+name = "ovz"
+vzpid_dir = "/var/lib/vzctl/vepid/"
+vz_dir = "/vz"
+vzpriv_dir = "%s/private" % vz_dir
+vzroot_dir = "%s/root" % vz_dir
+vz_conf_dir = "/etc/vz/conf/"
+vz_pidfiles = "/var/lib/vzctl/vepid/"
+cg_image_name = "ovzcg.img"
+
+class p_haul_type:
+	def __init__(self, ctid):
+		self._ctid = ctid
+		#
+		# This list would contain (v_in, v_out, v_br) tuples where
+		# v_in is the name of veth device in CT
+		# v_out is its peer on the host
+		# v_bridge is the bridge to which thie veth is attached
+		#
+		self._veths = []
+		self._cfg = []
+
+	def __load_ct_config(self, path):
+		print "Loading config file from %s" % path
+		ifd = open(os.path.join(path, self.__ct_config()))
+		for line in ifd:
+			self._cfg.append(line)
+
+			if line.startswith("NETIF="):
+				#
+				# Parse and keep veth pairs, later we will
+				# equip restore request with this data and
+				# will use it while (un)locking the network
+				#
+				v_in = None
+				v_out = None
+				v_bridge = None
+				vs = line.strip().split("=", 1)[1].strip("\"")
+				for parm in vs.split(","):
+					pa = parm.split("=")
+					if pa[0] == "ifname":
+						v_in = pa[1]
+					elif pa[0] == "host_ifname":
+						v_out = pa[1]
+					elif pa[0] == "bridge":
+						v_bridge = pa[1]
+
+				if v_in and v_out:
+					print "\tCollect %s -> %s (%s) veth" % (v_in, v_out, v_bridge)
+					veth = util.net_dev()
+					veth.name = v_in
+					veth.pair = v_out
+					veth.link = v_bridge
+					self._veths.append(veth)
+
+		ifd.close()
+
+	def __apply_cg_config(self):
+		print "Applying CT configs"
+		# FIXME -- implement
+		pass
+
+	def init_src(self):
+		self._fs_mounted = True
+		self._bridged = True
+		self.__load_ct_config(vz_conf_dir)
+
+	def init_dst(self):
+		self._fs_mounted = False
+		self._bridged = False
+
+	def set_options(self, opts):
+		pass
+
+	def root_task_pid(self):
+		pf = open(os.path.join(vzpid_dir, self._ctid))
+		pid = pf.read()
+		return int(pid)
+
+	def __ct_priv(self):
+		return "%s/%s" % (vzpriv_dir, self._ctid)
+
+	def __ct_root(self):
+		return "%s/%s" % (vzroot_dir, self._ctid)
+
+	def __ct_config(self):
+		return "%s.conf" % self._ctid
+
+	#
+	# Meta-images for OVZ -- container config and info about CGroups
+	#
+	def get_meta_images(self, path):
+		cg_img = os.path.join(path, cg_image_name)
+		p_haul_cgroup.dump_hier(self.root_task_pid(), cg_img)
+		cfg_name = self.__ct_config()
+		return [ (os.path.join(vz_conf_dir, cfg_name), cfg_name), \
+			 (cg_img, cg_image_name) ]
+
+	def put_meta_images(self, path):
+		print "Putting config file into %s" % vz_conf_dir
+
+		self.__load_ct_config(path)
+		ofd = open(os.path.join(vz_conf_dir, self.__ct_config()), "w")
+		ofd.writelines(self._cfg)
+		ofd.close()
+
+		# Keep this name, we'll need one in prepare_ct()
+		self.cg_img = os.path.join(path, cg_image_name)
+
+	#
+	# Create cgroup hierarchy and put root task into it
+	# Hierarchy is unlimited, we will apply config limitations
+	# in ->restored->__apply_cg_config later
+	#
+	def prepare_ct(self, pid):
+		p_haul_cgroup.restore_hier(pid, self.cg_img)
+
+	def __umount_root(self):
+		print "Umounting CT root"
+		os.system("umount %s" % self.__ct_root())
+		self._fs_mounted = False
+
+	def mount(self):
+		nroot = self.__ct_root()
+		print "Mounting CT root to %s" % nroot
+		if not os.access(nroot, os.F_OK):
+			os.makedirs(nroot)
+		os.system("mount --bind %s %s" % (self.__ct_priv(), nroot))
+		self._fs_mounted = True
+		return nroot
+
+	def umount(self):
+		if self._fs_mounted:
+			self.__umount_root()
+
+	def get_fs(self):
+		rootfs = util.path_to_fs(self.__ct_priv())
+		if not rootfs:
+			print "CT is on unknown FS"
+			return None
+
+		print "CT is on %s" % rootfs
+
+		if rootfs == "nfs":
+			return fs_haul_shared.p_haul_fs()
+		if rootfs == "ext3" or rootfs == "ext4":
+			return fs_haul_subtree.p_haul_fs(self.__ct_priv())
+
+		print "Unknown CT FS"
+		return None
+
+	def restored(self, pid):
+		print "Writing pidfile"
+		pidfile = open(os.path.join(vz_pidfiles, self._ctid), 'w')
+		pidfile.write("%d" % pid)
+		pidfile.close()
+
+		self.__apply_cg_config()
+
+	def net_lock(self):
+		for veth in self._veths:
+			util.ifdown(veth.pair)
+
+	def net_unlock(self):
+		for veth in self._veths:
+			util.ifup(veth.pair)
+			if veth.link and not self._bridged:
+				util.bridge_add(veth.pair, veth.link)
+
+	def can_migrate_tcp(self):
+		return True
+
+	def veths(self):
+		#
+		# Caller wants to see list of tuples with [0] being name
+		# in CT and [1] being name on host. Just return existing
+		# tuples, the [2] with bridge name wouldn't hurt
+		#
+		return self._veths
+
diff --git a/phaul/p_haul_pid.py b/phaul/p_haul_pid.py
new file mode 100644
index 0000000..f0051b0
--- /dev/null
+++ b/phaul/p_haul_pid.py
@@ -0,0 +1,84 @@
+#
+# Individual process hauler
+#
+
+import fs_haul_shared
+
+name = "pid"
+
+class p_haul_type:
+	def __init__(self, id):
+		self.pid = int(id)
+		self._pidfile = None
+
+
+	#
+	# Initialize itself for source node or destination one
+	#
+	def init_src(self):
+		pass
+	def init_dst(self):
+		pass
+
+	def set_options(self, opts):
+		self._pidfile = opts["dst_rpid"]
+		self._fs_root = opts["pid_root"]
+
+	# Report the pid of the root task of what we're
+	# goung to migrate
+	def root_task_pid(self):
+		return self.pid
+
+	# Prepare filesystem before restoring. Retrun
+	# the new root, if required. 'None' will mean
+	# that things will get restored in the current
+	# mount namespace and w/o chroot
+	def mount(self):
+		return self._fs_root
+
+	# Remove any specific FS setup
+	def umount(self):
+		pass
+
+	# Get driver for FS migration
+	def get_fs(self):
+		return fs_haul_shared.p_haul_fs()
+
+	# Get list of files which should be copied to
+	# the destination node. The dir argument is where
+	# temporary stuff can be put
+	def get_meta_images(self, dir):
+		return []
+
+	# Take your files from dir and put in whatever
+	# places are appropriate. Paths (relative) are
+	# preserved.
+	def put_meta_images(self, dir):
+		pass
+
+	# Things are started to get restored, only the
+	# first task (with pid @pid) is created.
+	def prepare_ct(self, pid):
+		pass
+
+	# Restoring done, the new top task has pid pid
+	def restored(self, pid):
+		if self._pidfile:
+			print "Writing rst pidfile"
+			open(self._pidfile, "w").writelines(["%d" % pid])
+
+	#
+	# Lock and unlock networking
+	#
+	def net_lock(self):
+		pass
+
+	def net_unlock(self):
+		pass
+
+	def can_migrate_tcp(self):
+		return False
+
+	# Get list of veth pairs if any
+	def veths(self):
+		return []
diff --git a/phaul/p_haul_service.py b/phaul/p_haul_service.py
new file mode 100644
index 0000000..5587bd7
--- /dev/null
+++ b/phaul/p_haul_service.py
@@ -0,0 +1,154 @@
+#
+# P.HAUL code, that helps on the target node (rpyc service)
+#
+
+import xem_rpc
+import rpc_pb2 as cr_rpc
+import images
+import criu_api
+import p_haul_type
+
+class phaul_service:
+	def on_connect(self):
+		print "Connected"
+		self.dump_iter = 0
+		self.restored = False
+		self.criu = None
+		self.data_sk = None
+		self.img = None
+		self.htype = None
+
+	def on_disconnect(self):
+		print "Disconnected"
+		if self.criu:
+			self.criu.close()
+
+		if self.data_sk:
+			self.data_sk.close()
+
+		if self.htype and not self.restored:
+			self.htype.umount()
+
+		if self.img:
+			print "Closing images"
+			if not self.restored:
+				self.img.save_images()
+			self.img.close()
+
+	def on_socket_open(self, sk, uname):
+		self.data_sk = sk
+		print "Data socket (%s) accepted" % uname
+
+	def rpc_setup(self, htype_id):
+		print "Setting up service side", htype_id
+		self.img = images.phaul_images("rst")
+		self.criu = criu_api.criu_conn(self.data_sk)
+		self.htype = p_haul_type.get_dst(htype_id)
+
+	def rpc_set_options(self, opts):
+		self.criu.verbose(opts["verbose"])
+		self.img.set_options(opts)
+		self.htype.set_options(opts)
+
+	def start_page_server(self):
+		print "Starting page server for iter %d" % self.dump_iter
+
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.PAGE_SERVER
+		req.keep_open = True
+		req.opts.ps.fd = self.criu.mem_sk_fileno()
+
+		req.opts.images_dir_fd = self.img.image_dir_fd()
+		req.opts.work_dir_fd = self.img.work_dir_fd()
+		p_img = self.img.prev_image_dir()
+		if p_img:
+			req.opts.parent_img = p_img
+
+		print "\tSending criu rpc req"
+		resp = self.criu.send_req(req)
+		if not resp.success:
+			raise Exception("Failed to start page server")
+
+		print "\tPage server started at %d" % resp.ps.pid
+
+	def rpc_start_iter(self):
+		self.dump_iter += 1
+		self.img.new_image_dir()
+		self.start_page_server()
+
+	def rpc_end_iter(self):
+		pass
+
+	def rpc_start_accept_images(self, dir_id):
+		self.img.start_accept_images(dir_id, self.data_sk)
+
+	def rpc_stop_accept_images(self):
+		self.img.stop_accept_images()
+
+	def rpc_check_cpuinfo(self):
+		print "Checking cpuinfo"
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.CPUINFO_CHECK
+		req.opts.images_dir_fd = self.img.work_dir_fd()
+		req.keep_open = True
+		resp = self.criu.send_req(req)
+		print "   `-", resp.success
+		return resp.success
+
+	def rpc_restore_from_images(self):
+		print "Restoring from images"
+		self.htype.put_meta_images(self.img.image_dir())
+
+		req = cr_rpc.criu_req()
+		req.type = cr_rpc.RESTORE
+		req.opts.images_dir_fd = self.img.image_dir_fd()
+		req.opts.work_dir_fd = self.img.work_dir_fd()
+		req.opts.notify_scripts = True
+
+		if self.htype.can_migrate_tcp():
+			req.opts.tcp_established = True
+
+		for veth in self.htype.veths():
+			v = req.opts.veths.add()
+			v.if_in = veth.name
+			v.if_out = veth.pair
+
+		nroot = self.htype.mount()
+		if nroot:
+			req.opts.root = nroot
+			print "Restore root set to %s" % req.opts.root
+
+		cc = self.criu
+		resp = cc.send_req(req)
+		while True:
+			if resp.type == cr_rpc.NOTIFY:
+				print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
+				if resp.notify.script == "setup-namespaces":
+					#
+					# At that point we have only one task
+					# living in namespaces and waiting for
+					# us to ACK the notify. Htype might want
+					# to configure namespace (external net
+					# devices) and cgroups
+					#
+					self.htype.prepare_ct(resp.notify.pid)
+				elif resp.notify.script == "network-unlock":
+					self.htype.net_unlock()
+				elif resp.notify.script == "network-lock":
+					raise Exception("Locking network on restore?")
+
+				resp = cc.ack_notify()
+				continue
+
+			if not resp.success:
+				raise Exception("Restore failed")
+
+			print "Restore succeeded"
+			break
+
+		self.htype.restored(resp.restore.pid)
+		self.restored = True
+
+	def rpc_restore_time(self):
+		stats = criu_api.criu_get_rstats(self.img)
+		return stats.restore_time
diff --git a/phaul/p_haul_type.py b/phaul/p_haul_type.py
new file mode 100644
index 0000000..6ac093d
--- /dev/null
+++ b/phaul/p_haul_type.py
@@ -0,0 +1,33 @@
+#
+# Haulers' type selector
+# Types (htypes) are classes, that help hauling processes.
+# See p_haul_pid for comments of how a class should look like.
+#
+
+import p_haul_ovz
+import p_haul_pid
+import p_haul_lxc
+
+haul_types = {
+	p_haul_ovz.name: p_haul_ovz,
+	p_haul_pid.name: p_haul_pid,
+	p_haul_lxc.name: p_haul_lxc,
+}
+
+def __get(id):
+	if haul_types.has_key(id[0]):
+		h_type = haul_types[id[0]]
+		return h_type.p_haul_type(id[1])
+	else:
+		print "Unknown type. Try one of", haul_types.keys()
+	 	return None
+
+def get_src(id):
+	ht = __get(id)
+	ht.init_src()
+	return ht
+
+def get_dst(id):
+	ht = __get(id)
+	ht.init_dst()
+	return ht
diff --git a/phaul/util.py b/phaul/util.py
new file mode 100644
index 0000000..9883bef
--- /dev/null
+++ b/phaul/util.py
@@ -0,0 +1,46 @@
+import os
+import fcntl
+import errno
+
+class net_dev:
+	def init(self):
+		self.name = None
+		self.pair = None
+		self.link = None
+
+def path_to_fs(path):
+	dev = os.stat(path)
+	dev_str = "%d:%d" % (os.major(dev.st_dev), os.minor(dev.st_dev))
+	mfd = open("/proc/self/mountinfo")
+	for ln in mfd:
+		ln_p = ln.split(None, 9)
+		if dev_str == ln_p[2]:
+			return ln_p[8]
+
+	return None
+
+def ifup(ifname):
+	print "\t\tUpping %s" % ifname
+	os.system("ip link set %s up" % ifname)
+
+def ifdown(ifname):
+	print "\t\tDowning %s" % ifname
+	os.system("ip link set %s down" % ifname)
+
+def bridge_add(ifname, brname):
+	print "\t\tAdd %s to %s" % (ifname, brname)
+	os.system("brctl addif %s %s" % (brname, ifname))
+
+def set_cloexec(sk):
+	fd = sk.fileno()
+	flags = fcntl.fcntl(sk, fcntl.F_GETFD)
+	fcntl.fcntl(sk, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+def makedirs(dirpath):
+	try:
+		os.makedirs(dirpath)
+	except OSError as er:
+		if er.errno == errno.EEXIST and os.path.isdir(dirpath):
+			pass
+		else:
+			raise
diff --git a/phaul/xem_rpc.py b/phaul/xem_rpc.py
new file mode 100644
index 0000000..8bbdac7
--- /dev/null
+++ b/phaul/xem_rpc.py
@@ -0,0 +1,198 @@
+import socket
+import select
+import threading
+import traceback
+import util
+import struct
+
+rpc_port = 12345
+rpc_sk_buf = 256
+
+RPC_CMD = 1
+RPC_CALL = 2
+
+RPC_RESP = 1
+RPC_EXC = 2
+
+#
+# Client
+#
+
+class _rpc_proxy_caller:
+	def __init__(self, sk, typ, fname):
+		self._rpc_sk = sk
+		self._fn_typ = typ
+		self._fn_name = fname
+
+	def __call__(self, *args):
+		call = (self._fn_typ, self._fn_name, args)
+		raw_data = repr(call)
+		self._rpc_sk.send(raw_data)
+		raw_data = self._rpc_sk.recv(rpc_sk_buf)
+		resp = eval(raw_data)
+
+		if resp[0] == RPC_RESP:
+			return resp[1]
+		elif resp[0] == RPC_EXC:
+			print "Remote exception"
+			raise Exception(resp[1])
+		else:
+			raise Exception("Proto resp error")
+
+class rpc_proxy:
+	def __init__(self, conn, *args):
+		self._srv = conn
+		self._rpc_sk = self._make_sk()
+		util.set_cloexec(self._rpc_sk)
+		_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
+
+	def __getattr__(self, attr):
+		return _rpc_proxy_caller(self._rpc_sk, RPC_CALL, attr)
+
+	def _make_sk(self):
+		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		sk.connect((self._srv, rpc_port))
+		return sk
+
+	def open_socket(self, uname):
+		sk = self._make_sk()
+		host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
+		c = _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "pick_channel")
+		c(host, uname)
+		return sk
+
+
+
+#
+# Server
+#
+
+class _rpc_server_sk:
+	def __init__(self, sk):
+		self._sk = sk
+		self._master = None
+
+	def fileno(self):
+		return self._sk.fileno()
+
+	def hash_name(self):
+		return self._sk.getpeername()
+
+	def get_name(self, mgr):
+		return self.hash_name()
+
+	def work(self, mgr):
+		raw_data = self._sk.recv(rpc_sk_buf)
+		if not raw_data:
+			mgr.remove(self)
+			if self._master:
+				self._master.on_disconnect()
+			self._sk.close()
+			return
+
+		data = eval(raw_data)
+		try:
+			if data[0] == RPC_CALL:
+				if not self._master:
+					raise Exception("Proto seq error")
+
+				res = getattr(self._master, "rpc_" + data[1])(*data[2])
+			elif data[0] == RPC_CMD:
+				res = getattr(self, data[1])(mgr, *data[2])
+			else:
+				raise Exception(("Proto typ error", data[0]))
+		except Exception as e:
+			traceback.print_exc()
+			res = (RPC_EXC, e)
+		else:
+			res = (RPC_RESP, res)
+
+		raw_data = repr(res)
+		self._sk.send(raw_data)
+
+	def init_rpc(self, mgr, args):
+		util.set_cloexec(self)
+		self._master = mgr.make_master()
+		self._master.on_connect(*args)
+
+	def pick_channel(self, mgr, hash_name, uname):
+		sk = mgr.pick_sk(hash_name)
+		if sk:
+			self._master.on_socket_open(sk._sk, uname)
+
+class _rpc_server_ask:
+	def __init__(self):
+		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		sk.bind(("0.0.0.0", rpc_port))
+		sk.listen(8)
+		self._sk = sk
+		util.set_cloexec(self)
+
+	def fileno(self):
+		return self._sk.fileno()
+
+	def work(self, mgr):
+		sk, addr = self._sk.accept()
+		mgr.add(_rpc_server_sk(sk))
+
+class _rpc_stop_fd:
+	def __init__(self, fd):
+		self._fd = fd
+
+	def fileno(self):
+		return self._fd.fileno()
+
+	def work(self, mgr):
+		mgr.stop()
+
+class _rpc_server_manager:
+	def __init__(self, srv_class):
+		self._srv_class = srv_class
+		self._sk_by_name = {}
+		self._poll_list = [_rpc_server_ask()]
+		self._alive = True
+
+	def add(self, sk):
+		self._sk_by_name[sk.hash_name()] = sk
+		self._poll_list.append(sk)
+
+	def remove(self, sk):
+		self._sk_by_name.pop(sk.hash_name())
+		self._poll_list.remove(sk)
+
+	def pick_sk(self, hash_name):
+		sk = self._sk_by_name.pop(hash_name, None)
+		if sk:
+			self._poll_list.remove(sk)
+		return sk
+
+	def make_master(self):
+		return self._srv_class()
+
+	def stop(self):
+		self._alive = False
+
+	def loop(self, stop_fd):
+		if stop_fd:
+			self._poll_list.append(_rpc_stop_fd(stop_fd))
+
+		while self._alive:
+			r, w, x = select.select(self._poll_list, [], [])
+			for sk in r:
+				sk.work(self)
+
+		print "RPC Service stops"
+
+class rpc_threaded_srv(threading.Thread):
+	def __init__(self, srv_class):
+		threading.Thread.__init__(self)
+		self._mgr = _rpc_server_manager(srv_class)
+		self._stop_fd = None
+
+	def run(self):
+		self._mgr.loop(self._stop_fd)
+
+	def get_stop_fd(self):
+		sks = socket.socketpair()
+		self._stop_fd = sks[0]
+		return sks[1]
diff --git a/util.py b/util.py
deleted file mode 100644
index 9883bef..0000000
--- a/util.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import os
-import fcntl
-import errno
-
-class net_dev:
-	def init(self):
-		self.name = None
-		self.pair = None
-		self.link = None
-
-def path_to_fs(path):
-	dev = os.stat(path)
-	dev_str = "%d:%d" % (os.major(dev.st_dev), os.minor(dev.st_dev))
-	mfd = open("/proc/self/mountinfo")
-	for ln in mfd:
-		ln_p = ln.split(None, 9)
-		if dev_str == ln_p[2]:
-			return ln_p[8]
-
-	return None
-
-def ifup(ifname):
-	print "\t\tUpping %s" % ifname
-	os.system("ip link set %s up" % ifname)
-
-def ifdown(ifname):
-	print "\t\tDowning %s" % ifname
-	os.system("ip link set %s down" % ifname)
-
-def bridge_add(ifname, brname):
-	print "\t\tAdd %s to %s" % (ifname, brname)
-	os.system("brctl addif %s %s" % (brname, ifname))
-
-def set_cloexec(sk):
-	fd = sk.fileno()
-	flags = fcntl.fcntl(sk, fcntl.F_GETFD)
-	fcntl.fcntl(sk, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
-
-def makedirs(dirpath):
-	try:
-		os.makedirs(dirpath)
-	except OSError as er:
-		if er.errno == errno.EEXIST and os.path.isdir(dirpath):
-			pass
-		else:
-			raise
diff --git a/xem_rpc.py b/xem_rpc.py
deleted file mode 100644
index 8bbdac7..0000000
--- a/xem_rpc.py
+++ /dev/null
@@ -1,198 +0,0 @@
-import socket
-import select
-import threading
-import traceback
-import util
-import struct
-
-rpc_port = 12345
-rpc_sk_buf = 256
-
-RPC_CMD = 1
-RPC_CALL = 2
-
-RPC_RESP = 1
-RPC_EXC = 2
-
-#
-# Client
-#
-
-class _rpc_proxy_caller:
-	def __init__(self, sk, typ, fname):
-		self._rpc_sk = sk
-		self._fn_typ = typ
-		self._fn_name = fname
-
-	def __call__(self, *args):
-		call = (self._fn_typ, self._fn_name, args)
-		raw_data = repr(call)
-		self._rpc_sk.send(raw_data)
-		raw_data = self._rpc_sk.recv(rpc_sk_buf)
-		resp = eval(raw_data)
-
-		if resp[0] == RPC_RESP:
-			return resp[1]
-		elif resp[0] == RPC_EXC:
-			print "Remote exception"
-			raise Exception(resp[1])
-		else:
-			raise Exception("Proto resp error")
-
-class rpc_proxy:
-	def __init__(self, conn, *args):
-		self._srv = conn
-		self._rpc_sk = self._make_sk()
-		util.set_cloexec(self._rpc_sk)
-		_rpc_proxy_caller(self._rpc_sk, RPC_CMD, "init_rpc")(args)
-
-	def __getattr__(self, attr):
-		return _rpc_proxy_caller(self._rpc_sk, RPC_CALL, attr)
-
-	def _make_sk(self):
-		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.connect((self._srv, rpc_port))
-		return sk
-
-	def open_socket(self, uname):
-		sk = self._make_sk()
-		host = _rpc_proxy_caller(sk, RPC_CMD, "get_name")()
-		c = _rpc_proxy_caller(self._rpc_sk, RPC_CMD, "pick_channel")
-		c(host, uname)
-		return sk
-
-
-
-#
-# Server
-#
-
-class _rpc_server_sk:
-	def __init__(self, sk):
-		self._sk = sk
-		self._master = None
-
-	def fileno(self):
-		return self._sk.fileno()
-
-	def hash_name(self):
-		return self._sk.getpeername()
-
-	def get_name(self, mgr):
-		return self.hash_name()
-
-	def work(self, mgr):
-		raw_data = self._sk.recv(rpc_sk_buf)
-		if not raw_data:
-			mgr.remove(self)
-			if self._master:
-				self._master.on_disconnect()
-			self._sk.close()
-			return
-
-		data = eval(raw_data)
-		try:
-			if data[0] == RPC_CALL:
-				if not self._master:
-					raise Exception("Proto seq error")
-
-				res = getattr(self._master, "rpc_" + data[1])(*data[2])
-			elif data[0] == RPC_CMD:
-				res = getattr(self, data[1])(mgr, *data[2])
-			else:
-				raise Exception(("Proto typ error", data[0]))
-		except Exception as e:
-			traceback.print_exc()
-			res = (RPC_EXC, e)
-		else:
-			res = (RPC_RESP, res)
-
-		raw_data = repr(res)
-		self._sk.send(raw_data)
-
-	def init_rpc(self, mgr, args):
-		util.set_cloexec(self)
-		self._master = mgr.make_master()
-		self._master.on_connect(*args)
-
-	def pick_channel(self, mgr, hash_name, uname):
-		sk = mgr.pick_sk(hash_name)
-		if sk:
-			self._master.on_socket_open(sk._sk, uname)
-
-class _rpc_server_ask:
-	def __init__(self):
-		sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sk.bind(("0.0.0.0", rpc_port))
-		sk.listen(8)
-		self._sk = sk
-		util.set_cloexec(self)
-
-	def fileno(self):
-		return self._sk.fileno()
-
-	def work(self, mgr):
-		sk, addr = self._sk.accept()
-		mgr.add(_rpc_server_sk(sk))
-
-class _rpc_stop_fd:
-	def __init__(self, fd):
-		self._fd = fd
-
-	def fileno(self):
-		return self._fd.fileno()
-
-	def work(self, mgr):
-		mgr.stop()
-
-class _rpc_server_manager:
-	def __init__(self, srv_class):
-		self._srv_class = srv_class
-		self._sk_by_name = {}
-		self._poll_list = [_rpc_server_ask()]
-		self._alive = True
-
-	def add(self, sk):
-		self._sk_by_name[sk.hash_name()] = sk
-		self._poll_list.append(sk)
-
-	def remove(self, sk):
-		self._sk_by_name.pop(sk.hash_name())
-		self._poll_list.remove(sk)
-
-	def pick_sk(self, hash_name):
-		sk = self._sk_by_name.pop(hash_name, None)
-		if sk:
-			self._poll_list.remove(sk)
-		return sk
-
-	def make_master(self):
-		return self._srv_class()
-
-	def stop(self):
-		self._alive = False
-
-	def loop(self, stop_fd):
-		if stop_fd:
-			self._poll_list.append(_rpc_stop_fd(stop_fd))
-
-		while self._alive:
-			r, w, x = select.select(self._poll_list, [], [])
-			for sk in r:
-				sk.work(self)
-
-		print "RPC Service stops"
-
-class rpc_threaded_srv(threading.Thread):
-	def __init__(self, srv_class):
-		threading.Thread.__init__(self)
-		self._mgr = _rpc_server_manager(srv_class)
-		self._stop_fd = None
-
-	def run(self):
-		self._mgr.loop(self._stop_fd)
-
-	def get_stop_fd(self):
-		sks = socket.socketpair()
-		self._stop_fd = sks[0]
-		return sks[1]
-- 
1.9.3



More information about the CRIU mailing list