[CRIU] [PATCH 4/7] p.haul: implement migration over existing connections
Nikita Spiridonov
nspiridonov at odin.com
Tue Oct 6 08:28:32 PDT 2015
Remove standalone mode, p.haul now can work only over existing
connections specified via command line arguments as file
descriptors.
Three arguments required - --fdrpc for rpc calls, --fdmem for c/r
images migration and --fdfs for disk migration. Expect that each
file descriptor represent socket opened in blocking mode with domain
AF_INET and type SOCK_STREAM.
--to option preserved for now in p.haul for disk migration via
rsync, it will be unused soon at least for vz module which will
use ploop for disk migration.
Signed-off-by: Nikita Spiridonov <nspiridonov at odin.com>
---
p.haul | 51 +++++++++++++++++++------------
p.haul-service | 31 ++++++++++++-------
phaul/fs_haul_shared.py | 2 +-
phaul/fs_haul_subtree.py | 5 ++-
phaul/p_haul_connection.py | 44 +++++++++++++++++++++++++++
phaul/p_haul_iters.py | 16 +++++-----
phaul/p_haul_service.py | 29 ++++++++----------
phaul/util.py | 1 -
phaul/xem_rpc.py | 71 +++++++++++--------------------------------
phaul/xem_rpc_client.py | 18 +----------
10 files changed, 140 insertions(+), 128 deletions(-)
create mode 100644 phaul/p_haul_connection.py
diff --git a/p.haul b/p.haul
index aea7971..4d3877c 100755
--- a/p.haul
+++ b/p.haul
@@ -3,34 +3,39 @@
import sys
import argparse
import logging
-import phaul.p_haul_iters as ph_iters
-import phaul.images as ph_images
-import phaul.criu_api as ph_criu_api
-import phaul.xem_rpc as ph_xem_rpc
+import phaul.p_haul_iters
+import phaul.p_haul_connection
+import phaul.images
+import phaul.criu_api
+import phaul.xem_rpc
# Usage idea
-# p.haul <type> <id> <destination>
+# p.haul <type> <id> --fdrpc <fd> --fdmem <fd> --fdfs <fd>
#
-# E.g.
-#
-# # p.haul vz 100 10.0.0.1
-#
-# or
+# p.haul work over existing connections specified via command line arguments
+# as file descriptors. Three arguments required - --fdrpc for rpc calls,
+# --fdmem for c/r images migration and --fdfs for disk migration. For testing
+# purposed p.haul-wrap helper script can be used which establish required
+# connections with target host and call p.haul or p.haul-service.
#
-# # p.haul lxc myct 10.0.0.2
+# E.g.
+# p.haul vz 100 --fdrpc 3 --fdmem 4 --fdfs 5
+# p.haul lxc myct --fdrpc 3 --fdmem 4 --fdfs 5
#
parser = argparse.ArgumentParser("Process HAULer")
parser.add_argument("type", help="Type of hat to haul, e.g. vz or lxc")
parser.add_argument("id", help="ID of what to haul")
-parser.add_argument("to", help="IP where to haul")
-parser.add_argument("-v", help="Verbosity level", default=ph_criu_api.def_verb, type=int, dest="verbose")
+parser.add_argument("--to", help="IP where to haul")
+parser.add_argument("--fdrpc", help="File descriptor of rpc socket", type=int, required=True)
+parser.add_argument("--fdmem", help="File descriptor of memory socket", type=int, required=True)
+parser.add_argument("--fdfs", help="File descriptor of fs socket", type=int, required=True)
+parser.add_argument("-v", help="Verbosity level", default=phaul.criu_api.def_verb, type=int, dest="verbose")
parser.add_argument("--keep-images", help="Keep images after migration", default=False, action='store_true')
parser.add_argument("--dst-rpid", help="Write pidfile on restore", default=None)
-parser.add_argument("--img-path", help="Dirctory where to put images", default=ph_images.def_path)
+parser.add_argument("--img-path", help="Dirctory where to put images", default=phaul.images.def_path)
parser.add_argument("--pid-root", help="Path to tree's FS root")
parser.add_argument("--force", help="Don't do any sanity (CPU compat) checks", default=False, action='store_true')
-parser.add_argument("--port", help="Port where to haul", type=int, default=ph_xem_rpc.rpc_port)
parser.add_argument("--log-file", help="Write logging messages to specified file")
args = parser.parse_args()
@@ -44,11 +49,17 @@ def log_uncaught_exception(type, value, traceback):
logging.error(value, exc_info=(type, value, traceback))
sys.excepthook = log_uncaught_exception
-args_dict = vars(args)
-ph_type = (args_dict.pop("type"), args_dict.pop("id"))
-dst = (args_dict.pop("to"), args_dict.pop("port"))
+logging.info("Starting p.haul")
+
+# Establish connection
+connection = phaul.p_haul_connection.establish(args.fdrpc, args.fdmem,
+ args.fdfs)
# Start the migration
-worker = ph_iters.phaul_iter_worker(ph_type, dst)
-worker.set_options(args_dict)
+ph_type = args.type, args.id
+worker = phaul.p_haul_iters.phaul_iter_worker(ph_type, connection)
+worker.set_options(vars(args))
worker.start_migration()
+
+# Close connection
+connection.close()
diff --git a/p.haul-service b/p.haul-service
index fbaa7ff..858b0ff 100755
--- a/p.haul-service
+++ b/p.haul-service
@@ -4,12 +4,14 @@ import sys
import signal
import argparse
import logging
-import phaul.xem_rpc as ph_xem_rpc
-import phaul.p_haul_service as ph_srv
+import phaul.xem_rpc
+import phaul.p_haul_service
+import phaul.p_haul_connection
parser = argparse.ArgumentParser("Process HAULer service server")
-parser.add_argument("--bind-addr", help="IP to bind to", type=str, default="0.0.0.0")
-parser.add_argument("--bind-port", help="Port to bind to", type=int, default=ph_xem_rpc.rpc_port)
+parser.add_argument("--fdrpc", help="File descriptor of rpc socket", type=int, required=True)
+parser.add_argument("--fdmem", help="File descriptor of memory socket", type=int, required=True)
+parser.add_argument("--fdfs", help="File descriptor of fs socket", type=int, required=True)
parser.add_argument("--log-file", help="Write logging messages to specified file")
args = parser.parse_args()
@@ -23,18 +25,22 @@ def log_uncaught_exception(type, value, traceback):
logging.error(value, exc_info=(type, value, traceback))
sys.excepthook = log_uncaught_exception
-host = (args.bind_addr, args.bind_port)
-
-sfd = None
+stop_fd = None
def fin(foo, bar):
logging.info("Stop by %d", foo)
- sfd.close()
+ stop_fd.close()
+
+logging.info("Starting p.haul service")
-logging.info("Starting p.haul rpyc service")
-t = ph_xem_rpc.rpc_threaded_srv(ph_srv.phaul_service, host)
+# Establish connection
+connection = phaul.p_haul_connection.establish(args.fdrpc, args.fdmem,
+ args.fdfs)
+
+t = phaul.xem_rpc.rpc_threaded_srv(phaul.p_haul_service.phaul_service,
+ connection)
# FIXME: Setup stop handlers
-sfd = t.get_stop_fd()
+stop_fd = t.init_stop_fd()
signal.signal(signal.SIGTERM, fin)
signal.signal(signal.SIGINT, fin)
@@ -42,3 +48,6 @@ t.start()
signal.pause()
t.join()
logging.info("Bye!")
+
+# Close connection
+connection.close()
diff --git a/phaul/fs_haul_shared.py b/phaul/fs_haul_shared.py
index d6b3a61..2212e0e 100644
--- a/phaul/fs_haul_shared.py
+++ b/phaul/fs_haul_shared.py
@@ -8,7 +8,7 @@ class p_haul_fs:
def __init__(self):
logging.info("Initilized shared FS hauler")
- def set_target_host(self, thost):
+ def set_options(self, opts):
pass
def set_work_dir(self, wdir):
diff --git a/phaul/fs_haul_subtree.py b/phaul/fs_haul_subtree.py
index 5d90cb9..a9bd559 100644
--- a/phaul/fs_haul_subtree.py
+++ b/phaul/fs_haul_subtree.py
@@ -14,9 +14,10 @@ class p_haul_fs:
def __init__(self, subtree_path):
logging.info("Initialized subtree FS hauler (%s)", subtree_path)
self.__root = subtree_path
+ self.__thost = None
- def set_target_host(self, thost):
- self.__thost = thost
+ def set_options(self, opts):
+ self.__thost = opts["to"]
def set_work_dir(self, wdir):
self.__wdir = wdir
diff --git a/phaul/p_haul_connection.py b/phaul/p_haul_connection.py
new file mode 100644
index 0000000..48b962a
--- /dev/null
+++ b/phaul/p_haul_connection.py
@@ -0,0 +1,44 @@
+#
+# p.haul connection module contain logic needed to establish connection
+# between p.haul and p.haul-service.
+#
+
+import logging
+import socket
+import util
+
+class connection:
+ """p.haul connection
+
+ Class encapsulate connections reqired for p.haul work, including rpc socket
+ (socket for RPC calls), memory socket (socket for c/r images migration) and
+ fs socket (socket for disk migration).
+ """
+
+ def __init__(self, rpc_sk, mem_sk, fs_sk):
+ self.rpc_sk = rpc_sk
+ self.mem_sk = mem_sk
+ self.fs_sk = fs_sk
+
+ def close(self):
+ self.rpc_sk.close()
+ self.mem_sk.close()
+ self.fs_sk.close()
+
+def establish(fdrpc, fdmem, fdfs):
+ """Construct required socket objects from file descriptors
+
+ Expect that each file descriptor represent socket opened in blocking mode
+ with domain AF_INET and type SOCK_STREAM.
+ """
+
+ logging.info("Use existing connections, fdrpc=%d fdmem=%d fdfs=%d", fdrpc,
+ fdmem, fdfs)
+
+ rpc_sk = socket.fromfd(fdrpc, socket.AF_INET, socket.SOCK_STREAM)
+ mem_sk = socket.fromfd(fdmem, socket.AF_INET, socket.SOCK_STREAM)
+ fs_sk = socket.fromfd(fdfs, socket.AF_INET, socket.SOCK_STREAM)
+
+ util.set_cloexec(rpc_sk)
+
+ return connection(rpc_sk, mem_sk, fs_sk)
diff --git a/phaul/p_haul_iters.py b/phaul/p_haul_iters.py
index 6050236..0d929ca 100644
--- a/phaul/p_haul_iters.py
+++ b/phaul/p_haul_iters.py
@@ -23,14 +23,14 @@ phaul_iter_min_size = 64
phaul_iter_grow_max = 10
class phaul_iter_worker:
- def __init__(self, p_type, host):
- logging.info("Connecting to target host")
- self.target_host = xem_rpc_client.rpc_proxy(host)
- self.data_socket = self.target_host.open_socket("datask")
+ def __init__(self, p_type, connection):
+ self.connection = connection
+ self.target_host = xem_rpc_client.rpc_proxy(self.connection.rpc_sk)
logging.info("Setting up local")
+ self.criu_connection = criu_api.criu_conn(self.connection.mem_sk)
self.img = images.phaul_images("dmp")
- self.criu_connection = criu_api.criu_conn(self.data_socket)
+
self.htype = p_haul_type.get_src(p_type)
if not self.htype:
raise Exception("No htype driver found")
@@ -40,7 +40,6 @@ class phaul_iter_worker:
raise Exception("No FS driver found")
self.pid = self.htype.root_task_pid()
- self.fs.set_target_host(host[0])
logging.info("Setting up remote")
self.target_host.setup(p_type)
@@ -53,6 +52,7 @@ class phaul_iter_worker:
self.criu_connection.verbose(opts["verbose"])
self.img.set_options(opts)
self.htype.set_options(opts)
+ self.fs.set_options(opts)
self.__force = opts["force"]
def validate_cpu(self):
@@ -65,7 +65,7 @@ class phaul_iter_worker:
raise Exception("Can't dump cpuinfo")
logging.info("\t`- Sending CPU info")
- self.img.send_cpuinfo(self.target_host, self.data_socket)
+ self.img.send_cpuinfo(self.target_host, self.connection.mem_sk)
logging.info("\t`- Checking CPU info")
if not self.target_host.check_cpuinfo():
@@ -185,7 +185,7 @@ class phaul_iter_worker:
logging.info("Final FS and images sync")
self.fs.stop_migration()
self.img.sync_imgs_to_target(self.target_host, self.htype,
- self.data_socket)
+ self.connection.mem_sk)
logging.info("Asking target host to restore")
self.target_host.restore_from_images()
diff --git a/phaul/p_haul_service.py b/phaul/p_haul_service.py
index fd3c611..b3d4335 100644
--- a/phaul/p_haul_service.py
+++ b/phaul/p_haul_service.py
@@ -10,23 +10,24 @@ import criu_req
import p_haul_type
class phaul_service:
- def on_connect(self):
- logging.info("Connected")
- self.dump_iter = 0
- self.restored = False
+ def __init__(self, mem_sk, fs_sk):
+ self._mem_sk = mem_sk
+ self._fs_sk = fs_sk
+
self.criu_connection = None
- self.data_socket = None
self.img = None
self.htype = None
+ self.dump_iter_index = 0
+ self.restored = False
+
+ def on_connect(self):
+ logging.info("Connected")
def on_disconnect(self):
logging.info("Disconnected")
if self.criu_connection:
self.criu_connection.close()
- if self.data_socket:
- self.data_socket.close()
-
if self.htype and not self.restored:
self.htype.umount()
@@ -36,14 +37,10 @@ class phaul_service:
self.img.save_images()
self.img.close()
- def on_socket_open(self, sk, uname):
- self.data_socket = sk
- logging.info("Data socket (%s) accepted", uname)
-
def rpc_setup(self, htype_id):
logging.info("Setting up service side %s", htype_id)
self.img = images.phaul_images("rst")
- self.criu_connection = criu_api.criu_conn(self.data_socket)
+ self.criu_connection = criu_api.criu_conn(self._mem_sk)
self.htype = p_haul_type.get_dst(htype_id)
def rpc_set_options(self, opts):
@@ -52,7 +49,7 @@ class phaul_service:
self.htype.set_options(opts)
def start_page_server(self):
- logging.info("Starting page server for iter %d", self.dump_iter)
+ logging.info("Starting page server for iter %d", self.dump_iter_index)
logging.info("\tSending criu rpc req")
req = criu_req.make_page_server_req(self.htype, self.img,
@@ -64,7 +61,7 @@ class phaul_service:
logging.info("\tPage server started at %d", resp.ps.pid)
def rpc_start_iter(self):
- self.dump_iter += 1
+ self.dump_iter_index += 1
self.img.new_image_dir()
self.start_page_server()
@@ -72,7 +69,7 @@ class phaul_service:
pass
def rpc_start_accept_images(self, dir_id):
- self.img.start_accept_images(dir_id, self.data_socket)
+ self.img.start_accept_images(dir_id, self._mem_sk)
def rpc_stop_accept_images(self):
self.img.stop_accept_images()
diff --git a/phaul/util.py b/phaul/util.py
index b15c75f..1bfd6d1 100644
--- a/phaul/util.py
+++ b/phaul/util.py
@@ -33,7 +33,6 @@ def bridge_add(ifname, brname):
os.system("brctl addif %s %s" % (brname, ifname))
def set_cloexec(sk):
- fd = sk.fileno()
flags = fcntl.fcntl(sk, fcntl.F_GETFD)
fcntl.fcntl(sk, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
diff --git a/phaul/xem_rpc.py b/phaul/xem_rpc.py
index 92963d6..597bede 100644
--- a/phaul/xem_rpc.py
+++ b/phaul/xem_rpc.py
@@ -9,7 +9,6 @@ import traceback
import logging
import util
-rpc_port = 12345
rpc_sk_buf = 16384
RPC_CMD = 1
@@ -20,25 +19,18 @@ RPC_EXC = 2
class _rpc_server_sk:
def __init__(self, sk):
- self._sk = sk
+ self._rpc_sk = sk
self._master = None
def fileno(self):
- return self._sk.fileno()
-
- def hash_name(self):
- return self._sk.getpeername()
-
- def get_name(self, mgr):
- return self.hash_name()
+ return self._rpc_sk.fileno()
def work(self, mgr):
- raw_data = self._sk.recv(rpc_sk_buf)
+ raw_data = self._rpc_sk.recv(rpc_sk_buf)
if not raw_data:
- mgr.remove(self)
+ mgr.remove_poll_item(self)
if self._master:
self._master.on_disconnect()
- self._sk.close()
return
data = eval(raw_data)
@@ -59,33 +51,12 @@ class _rpc_server_sk:
res = (RPC_RESP, res)
raw_data = repr(res)
- self._sk.send(raw_data)
+ self._rpc_sk.send(raw_data)
def init_rpc(self, mgr, args):
- util.set_cloexec(self)
self._master = mgr.make_master()
self._master.on_connect(*args)
- def pick_channel(self, mgr, hash_name, uname):
- sk = mgr.pick_sk(hash_name)
- if sk:
- self._master.on_socket_open(sk._sk, uname)
-
-class _rpc_server_ask:
- def __init__(self, host):
- sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.bind(host)
- sk.listen(8)
- self._sk = sk
- util.set_cloexec(self)
-
- def fileno(self):
- return self._sk.fileno()
-
- def work(self, mgr):
- sk, addr = self._sk.accept()
- mgr.add(_rpc_server_sk(sk))
-
class _rpc_stop_fd:
def __init__(self, fd):
self._fd = fd
@@ -97,35 +68,29 @@ class _rpc_stop_fd:
mgr.stop()
class _rpc_server_manager:
- def __init__(self, srv_class, host):
+ def __init__(self, srv_class, connection):
self._srv_class = srv_class
- self._sk_by_name = {}
- self._poll_list = [_rpc_server_ask(host)]
+ self._connection = connection
+ self._poll_list = []
self._alive = True
- def add(self, sk):
- self._sk_by_name[sk.hash_name()] = sk
- self._poll_list.append(sk)
+ self.add_poll_item(_rpc_server_sk(connection.rpc_sk))
- def remove(self, sk):
- self._sk_by_name.pop(sk.hash_name())
- self._poll_list.remove(sk)
+ def add_poll_item(self, item):
+ self._poll_list.append(item)
- def pick_sk(self, hash_name):
- sk = self._sk_by_name.pop(hash_name, None)
- if sk:
- self._poll_list.remove(sk)
- return sk
+ def remove_poll_item(self, item):
+ self._poll_list.remove(item)
def make_master(self):
- return self._srv_class()
+ return self._srv_class(self._connection.mem_sk, self._connection.fs_sk)
def stop(self):
self._alive = False
def loop(self, stop_fd):
if stop_fd:
- self._poll_list.append(_rpc_stop_fd(stop_fd))
+ self.add_poll_item(_rpc_stop_fd(stop_fd))
while self._alive:
r, w, x = select.select(self._poll_list, [], [])
@@ -135,9 +100,9 @@ class _rpc_server_manager:
logging.info("RPC Service stops")
class rpc_threaded_srv(threading.Thread):
- def __init__(self, srv_class, host):
+ def __init__(self, srv_class, connection):
threading.Thread.__init__(self)
- self._mgr = _rpc_server_manager(srv_class, host)
+ self._mgr = _rpc_server_manager(srv_class, connection)
self._stop_fd = None
def run(self):
@@ -146,7 +111,7 @@ class rpc_threaded_srv(threading.Thread):
except:
logging.exception("Exception in rpc_threaded_srv")
- def get_stop_fd(self):
+ def init_stop_fd(self):
sks = socket.socketpair()
self._stop_fd = sks[0]
return sks[1]
diff --git a/phaul/xem_rpc_client.py b/phaul/xem_rpc_client.py
index 07246c0..c6fe8c3 100644
--- a/phaul/xem_rpc_client.py
+++ b/phaul/xem_rpc_client.py
@@ -29,24 +29,10 @@ class _rpc_proxy_caller:
raise Exception("Proto resp error")
class rpc_proxy:
- def __init__(self, conn, *args):
- self._srv = conn
- self._rpc_sk = self._make_sk()
- util.set_cloexec(self._rpc_sk)
+ def __init__(self, sk, *args):
+ self._rpc_sk = sk
c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "init_rpc")
c(args)
def __getattr__(self, attr):
return _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CALL, attr)
-
- def _make_sk(self):
- sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.connect(self._srv)
- return sk
-
- def open_socket(self, uname):
- sk = self._make_sk()
- host = _rpc_proxy_caller(sk, xem_rpc.RPC_CMD, "get_name")()
- c = _rpc_proxy_caller(self._rpc_sk, xem_rpc.RPC_CMD, "pick_channel")
- c(host, uname)
- return sk
--
1.7.1
More information about the CRIU
mailing list