[CRIU] [PATCH] [RFC] p.haul: close socket to the CRIU service if smth failed
Andrey Vagin
avagin at openvz.org
Mon Mar 24 06:26:34 PDT 2014
This patch implements __exit__ and __enter__ methods for the
criu_conn class and uses the "with" statement to access objects
of this class.
"""
When the “with” statement is executed, Python evaluates the expression,
calls the __enter__ method on the resulting value (which is called a
“context guard”), and assigns whatever __enter__ returns to the variable
given by as. Python will then execute the code body, and no matter what
happens in that code, call the guard object’s __exit__ method.
"""
Currently this socket is not closed in error cases. For example:
[src] $ ./p.haul ovz 1001 192.168.122.36
Asking target host to restore
======= Remote traceback =======
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 300, in _dispatch_request
res = self._HANDLERS[handler](self, *args)
File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 532, in _handle_call
return self._local_objects[oid](*args, **dict(kwargs))
File "/vz/lxc/p.haul/p_haul_service.py", line 122, in exposed_restore_from_images
self.htype.net_unlock()
File "/vz/lxc/p.haul/p_haul_ovz.py", line 177, in net_unlock
netif.ifup(veth.pairs)
AttributeError: net_dev instance has no attribute 'pairs'
======= Local exception ========
Traceback (most recent call last):
File "./p.haul", line 39, in <module>
worker.start_migration()
File "/vz/lxc/p.haul/p_haul_iters.py", line 202, in start_migration
self.th.restore_from_images()
File "/usr/lib/python2.7/site-packages/rpyc/core/netref.py", line 196, in __call__
return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
File "/usr/lib/python2.7/site-packages/rpyc/core/netref.py", line 71, in syncreq
return conn.sync_request(handler, oid, *args)
File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 438, in sync_request
raise obj
AttributeError: net_dev instance has no attribute 'pairs'
[src] $ ps axf
...
912 ? Ss 0:00 init
...
[dst] $ ps axf
...
28170 ? Ss 0:00 ./criu service -v4 -o criu.service.log --daemon
28293 ? S 0:00 \_ ./criu service -v4 -o criu.service.log --daemon
28295 ? Ss 0:00 \_ init
[dst] # lsof -p 28293 | grep unix
criu 28293 root 4u unix 0xffff8800b9a3d280 0t0 79903 /var/run/criu_service.socket
[dst] # ss -x | grep 79903
u_dgr ESTAB 0 0 /var/run/criu_service.socket 79903 * 79401
u_dgr ESTAB 0 0 * 79401 * 79903
Signed-off-by: Andrey Vagin <avagin at openvz.org>
---
p_haul_criu.py | 8 ++-
p_haul_iters.py | 212 +++++++++++++++++++++++++++---------------------------
p_haul_service.py | 142 ++++++++++++++++++------------------
3 files changed, 183 insertions(+), 179 deletions(-)
diff --git a/p_haul_criu.py b/p_haul_criu.py
index 21786c6..02e7e18 100644
--- a/p_haul_criu.py
+++ b/p_haul_criu.py
@@ -24,10 +24,14 @@ def_verb = 2
#
class criu_conn:
- def __init__(self):
+ def __enter__(self):
self.cs = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
self.cs.connect(criu_socket)
self.verb = def_verb
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.cs.close()
def verbose(self, level):
self.verb = level
@@ -44,7 +48,7 @@ class criu_conn:
resp.ParseFromString(self.cs.recv(1024))
return resp
- def ack_notify(self):
+ def ack_notify(self, success = True):
req = cr_rpc.criu_req()
req.type = cr_rpc.NOTIFY
req.notify_success = True
diff --git a/p_haul_iters.py b/p_haul_iters.py
index 92797ea..c6cbfb0 100644
--- a/p_haul_iters.py
+++ b/p_haul_iters.py
@@ -76,141 +76,141 @@ class phaul_iter_worker:
def start_migration(self):
print "Connecting to CRIU service"
- cc = cr_api.criu_conn()
- cc.verbose(self.verb)
+ with cr_api.criu_conn() as cc:
+ cc.verbose(self.verb)
- start_time = time.time()
- iter_times = []
+ start_time = time.time()
+ iter_times = []
- print "Preliminary FS migration"
- self.fs.set_work_dir(self.img.work_dir())
- self.fs.start_migration()
+ print "Preliminary FS migration"
+ self.fs.set_work_dir(self.img.work_dir())
+ self.fs.start_migration()
- print "Starting iterations"
- while True:
- print "* Iteration %d" % self.iteration
+ print "Starting iterations"
+ while True:
+ print "* Iteration %d" % self.iteration
- self.th.start_iter()
- self.img.new_image_dir()
+ self.th.start_iter()
+ self.img.new_image_dir()
- print "\tIssuing pre-dump command to service"
+ print "\tIssuing pre-dump command to service"
- req = self.make_dump_req(cr_rpc.PRE_DUMP)
- resp = cc.send_req(req)
- if (resp.type != cr_rpc.PRE_DUMP) or (not resp.success):
- raise Exception("Pre-dump failed")
+ req = self.make_dump_req(cr_rpc.PRE_DUMP)
+ resp = cc.send_req(req)
+ if (resp.type != cr_rpc.PRE_DUMP) or (not resp.success):
+ raise Exception("Pre-dump failed")
- print "\tPre-dump succeeded"
+ print "\tPre-dump succeeded"
- self.th.end_iter()
+ self.th.end_iter()
- stats = cr_api.criu_get_dstats(self.img)
- print "Dumped %d pages, %d skipped" % \
- (stats.pages_written, stats.pages_skipped_parent)
+ stats = cr_api.criu_get_dstats(self.img)
+ print "Dumped %d pages, %d skipped" % \
+ (stats.pages_written, stats.pages_skipped_parent)
- iter_times.append("%.2lf" % (stats.frozen_time / 1000000.))
- self.frozen_time += stats.frozen_time
+ iter_times.append("%.2lf" % (stats.frozen_time / 1000000.))
+ self.frozen_time += stats.frozen_time
- #
- # Need to decide whether we do next iteration
- # or stop on the existing and go do full dump
- # and restore
- #
-
- print "Checking iteration progress:"
-
- if stats.pages_written <= phaul_iter_min_size:
- print "\t> Small dump"
- break;
-
- if self.prev_stats:
- w_add = stats.pages_written - self.prev_stats.pages_written
- w_add = w_add * 100 / self.prev_stats.pages_written
- if w_add > phaul_iter_grow_max:
- print "\t> Iteration grows"
- break
+ #
+ # Need to decide whether we do next iteration
+ # or stop on the existing and go do full dump
+ # and restore
+ #
- if self.iteration >= phaul_iter_max:
- print "\t> Too many iterations"
- break
+ print "Checking iteration progress:"
- self.iteration += 1
- self.prev_stats = stats
- print "\t> Proceed to next iteration"
+ if stats.pages_written <= phaul_iter_min_size:
+ print "\t> Small dump"
+ break;
- self.fs.next_iteration()
+ if self.prev_stats:
+ w_add = stats.pages_written - self.prev_stats.pages_written
+ w_add = w_add * 100 / self.prev_stats.pages_written
+ if w_add > phaul_iter_grow_max:
+ print "\t> Iteration grows"
+ break
- #
- # Finish with iterations -- do full dump, send images
- # to target host and restore from them there
- #
+ if self.iteration >= phaul_iter_max:
+ print "\t> Too many iterations"
+ break
- print "Final dump and restore"
+ self.iteration += 1
+ self.prev_stats = stats
+ print "\t> Proceed to next iteration"
- self.th.start_iter()
- self.img.new_image_dir()
+ self.fs.next_iteration()
- print "\tIssuing dump command to service"
- req = self.make_dump_req(cr_rpc.DUMP)
- req.opts.notify_scripts = True
- req.opts.file_locks = True
- req.opts.evasive_devices = True
- req.opts.link_remap = True
- if self.htype.can_migrate_tcp:
- req.opts.tcp_established = True
+ #
+ # Finish with iterations -- do full dump, send images
+ # to target host and restore from them there
+ #
- cc.send_req(req, False)
+ print "Final dump and restore"
- while True:
- resp = cc.recv_resp()
- if resp.type != cr_rpc.NOTIFY:
- if resp.type == cr_rpc.DUMP and not resp.success:
- raise Exception("Dump failed")
- else:
- raise Exception("Unexpected responce from service (%d)" % resp.type)
+ self.th.start_iter()
+ self.img.new_image_dir()
- if resp.notify.script == "post-dump":
- #
- # Dump is effectively over. Now CRIU
- # waits for us to do whatever we want
- # and keeps the tasks frozen.
- #
- break
+ print "\tIssuing dump command to service"
+ req = self.make_dump_req(cr_rpc.DUMP)
+ req.opts.notify_scripts = True
+ req.opts.file_locks = True
+ req.opts.evasive_devices = True
+ req.opts.link_remap = True
+ if self.htype.can_migrate_tcp:
+ req.opts.tcp_established = True
+
+ cc.send_req(req, False)
+
+ while True:
+ resp = cc.recv_resp()
+ if resp.type != cr_rpc.NOTIFY:
+ if resp.type == cr_rpc.DUMP and not resp.success:
+ raise Exception("Dump failed")
+ else:
+ raise Exception("Unexpected responce from service (%d)" % resp.type)
+
+ if resp.notify.script == "post-dump":
+ #
+ # Dump is effectively over. Now CRIU
+ # waits for us to do whatever we want
+ # and keeps the tasks frozen.
+ #
+ break
- elif resp.notify.script == "network-lock":
- self.htype.net_lock()
- elif resp.notify.script == "network-unlock":
- self.htype.net_unlock()
+ elif resp.notify.script == "network-lock":
+ self.htype.net_lock()
+ elif resp.notify.script == "network-unlock":
+ self.htype.net_unlock()
- print "\t\tNotify (%s)" % resp.notify.script
- cc.ack_notify()
+ print "\t\tNotify (%s)" % resp.notify.script
+ cc.ack_notify()
- print "Dump complete"
- self.th.end_iter()
+ print "Dump complete"
+ self.th.end_iter()
- #
- # Dump is complete -- go to target node,
- # restore them there and kill (if required)
- # tasks on source node
- #
+ #
+ # Dump is complete -- go to target node,
+ # restore them there and kill (if required)
+ # tasks on source node
+ #
- print "Final FS and images sync"
- self.fs.stop_migration()
- self.img.sync_imgs_to_target(self.th, self.htype)
+ print "Final FS and images sync"
+ self.fs.stop_migration()
+ self.img.sync_imgs_to_target(self.th, self.htype)
- print "Asking target host to restore"
- self.th.restore_from_images()
+ print "Asking target host to restore"
+ self.th.restore_from_images()
- #
- # Ack the notify after restore -- CRIU would
- # then terminate all tasks and send us back
- # DUMP/success message
- #
+ #
+ # Ack the notify after restore -- CRIU would
+ # then terminate all tasks and send us back
+ # DUMP/success message
+ #
- cc.ack_notify()
- resp = cc.recv_resp()
- if resp.type != cr_rpc.DUMP:
- raise Exception("Dump failed")
+ cc.ack_notify()
+ resp = cc.recv_resp()
+ if resp.type != cr_rpc.DUMP:
+ raise Exception("Dump failed")
self.htype.umount()
diff --git a/p_haul_service.py b/p_haul_service.py
index ae243d6..3aae1b4 100644
--- a/p_haul_service.py
+++ b/p_haul_service.py
@@ -45,26 +45,26 @@ class phaul_service(rpyc.Service):
def start_page_server(self):
print "Starting page server for iter %d" % self.dump_iter
- cc = cr_api.criu_conn()
- cc.verbose(self.verb)
+ with cr_api.criu_conn() as cc:
+ cc.verbose(self.verb)
- req = cr_rpc.criu_req()
- req.type = cr_rpc.PAGE_SERVER
- req.opts.ps.port = ps_start_port + self.dump_iter # FIXME -- implement and use autobind in CRIU
+ req = cr_rpc.criu_req()
+ req.type = cr_rpc.PAGE_SERVER
+ req.opts.ps.port = ps_start_port + self.dump_iter # FIXME -- implement and use autobind in CRIU
- req.opts.images_dir_fd = self.img.image_dir_fd()
- req.opts.work_dir_fd = self.img.work_dir_fd()
- p_img = self.img.prev_image_dir()
- if p_img:
- req.opts.parent_img = p_img
+ req.opts.images_dir_fd = self.img.image_dir_fd()
+ req.opts.work_dir_fd = self.img.work_dir_fd()
+ p_img = self.img.prev_image_dir()
+ if p_img:
+ req.opts.parent_img = p_img
- print "\tSending criu rpc req"
- resp = cc.send_req(req)
- if (resp.type != cr_rpc.PAGE_SERVER) or (not resp.success):
- raise Exception("Failed to start page server")
+ print "\tSending criu rpc req"
+ resp = cc.send_req(req)
+ if (resp.type != cr_rpc.PAGE_SERVER) or (not resp.success):
+ raise Exception("Failed to start page server")
- self.page_server_pid = resp.ps.pid
- print "\tPage server started at %d" % resp.ps.pid
+ self.page_server_pid = resp.ps.pid
+ print "\tPage server started at %d" % resp.ps.pid
def exposed_start_iter(self):
self.dump_iter += 1
@@ -79,61 +79,61 @@ class phaul_service(rpyc.Service):
def exposed_restore_from_images(self):
print "Restoring from images"
- cc = cr_api.criu_conn()
- cc.verbose(self.verb)
-
- self.htype.put_meta_images(self.img.image_dir())
-
- req = cr_rpc.criu_req()
- req.type = cr_rpc.RESTORE
- req.opts.images_dir_fd = self.img.image_dir_fd()
- req.opts.work_dir_fd = self.img.work_dir_fd()
- req.opts.notify_scripts = True
-
- if self.htype.can_migrate_tcp():
- req.opts.tcp_established = True
-
- for veth in self.htype.veths():
- v = req.opts.veths.add()
- v.if_in = veth.name
- v.if_out = veth.pair
-
- nroot = self.htype.mount()
- if nroot:
- req.opts.root = nroot
- print "Restore root set to %s" % req.opts.root
-
- cc.send_req(req, False)
-
- while True:
- resp = cc.recv_resp()
- if resp.type == cr_rpc.NOTIFY:
- print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
- if resp.notify.script == "setup-namespaces":
- #
- # At that point we have only one task
- # living in namespaces and waiting for
- # us to ACK the notify. Htype might want
- # to configure namespace (external net
- # devices) and cgroups
- #
- self.htype.prepare_ct(resp.notify.pid)
- elif resp.notify.script == "network-unlock":
- self.htype.net_unlock()
- elif resp.notify.script == "network-lock":
- raise Exception("Locking network on restore?")
-
- cc.ack_notify()
- continue
-
- if resp.type != cr_rpc.RESTORE:
- raise Exception("Unexpected responce from service (%d)" % resp.type)
-
- if not resp.success:
- raise Exception("Restore failed")
-
- print "Restore succeeded"
- break
+ with cr_api.criu_conn() as cc:
+ cc.verbose(self.verb)
+
+ self.htype.put_meta_images(self.img.image_dir())
+
+ req = cr_rpc.criu_req()
+ req.type = cr_rpc.RESTORE
+ req.opts.images_dir_fd = self.img.image_dir_fd()
+ req.opts.work_dir_fd = self.img.work_dir_fd()
+ req.opts.notify_scripts = True
+
+ if self.htype.can_migrate_tcp():
+ req.opts.tcp_established = True
+
+ for veth in self.htype.veths():
+ v = req.opts.veths.add()
+ v.if_in = veth.name
+ v.if_out = veth.pair
+
+ nroot = self.htype.mount()
+ if nroot:
+ req.opts.root = nroot
+ print "Restore root set to %s" % req.opts.root
+
+ cc.send_req(req, False)
+
+ while True:
+ resp = cc.recv_resp()
+ if resp.type == cr_rpc.NOTIFY:
+ print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
+ if resp.notify.script == "setup-namespaces":
+ #
+ # At that point we have only one task
+ # living in namespaces and waiting for
+ # us to ACK the notify. Htype might want
+ # to configure namespace (external net
+ # devices) and cgroups
+ #
+ self.htype.prepare_ct(resp.notify.pid)
+ elif resp.notify.script == "network-unlock":
+ self.htype.net_unlock()
+ elif resp.notify.script == "network-lock":
+ raise Exception("Locking network on restore?")
+
+ cc.ack_notify()
+ continue
+
+ if resp.type != cr_rpc.RESTORE:
+ raise Exception("Unexpected responce from service (%d)" % resp.type)
+
+ if not resp.success:
+ raise Exception("Restore failed")
+
+ print "Restore succeeded"
+ break
self.htype.restored(resp.restore.pid)
self.restored = True
--
1.8.5.3
More information about the CRIU
mailing list