[CRIU] [PATCH] [RFC] p.haul: close socket to the CRIU service if smth failed
Andrew Vagin
avagin at parallels.com
Mon Mar 24 06:19:20 PDT 2014
Sorry, I forgot to remove debug messages.
On Mon, Mar 24, 2014 at 05:16:11PM +0400, Andrey Vagin wrote:
> Currently this socket is not closed in error cases. For example:
>
> [src] $ ./p.haul ovz 1001 192.168.122.36
> Asking target host to restore
> ======= Remote traceback =======
> Traceback (most recent call last):
> File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 300, in _dispatch_request
> res = self._HANDLERS[handler](self, *args)
> File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 532, in _handle_call
> return self._local_objects[oid](*args, **dict(kwargs))
> File "/vz/lxc/p.haul/p_haul_service.py", line 122, in exposed_restore_from_images
> self.htype.net_unlock()
> File "/vz/lxc/p.haul/p_haul_ovz.py", line 177, in net_unlock
> netif.ifup(veth.pairs)
> AttributeError: net_dev instance has no attribute 'pairs'
>
> ======= Local exception ========
> Traceback (most recent call last):
> File "./p.haul", line 39, in <module>
> worker.start_migration()
> File "/vz/lxc/p.haul/p_haul_iters.py", line 202, in start_migration
> self.th.restore_from_images()
> File "/usr/lib/python2.7/site-packages/rpyc/core/netref.py", line 196, in __call__
> return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
> File "/usr/lib/python2.7/site-packages/rpyc/core/netref.py", line 71, in syncreq
> return conn.sync_request(handler, oid, *args)
> File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 438, in sync_request
> raise obj
> AttributeError: net_dev instance has no attribute 'pairs'
>
> [src] $ ps axf
> ...
> 912 ? Ss 0:00 init
> ...
>
> [dst] $ ps axf
> ...
> 28170 ? Ss 0:00 ./criu service -v4 -o criu.service.log --daemon
> 28293 ? S 0:00 \_ ./criu service -v4 -o criu.service.log --daemon
> 28295 ? Ss 0:00 \_ init
>
> [dst] # lsof -p 28293 | grep unix
> criu 28293 root 4u unix 0xffff8800b9a3d280 0t0 79903 /var/run/criu_service.socket
> [dst] # ss -x | grep 79903
> u_dgr ESTAB 0 0 /var/run/criu_service.socket 79903 * 79401
> u_dgr ESTAB 0 0 * 79401 * 79903
>
> Signed-off-by: Andrey Vagin <avagin at openvz.org>
> ---
> p_haul_criu.py | 10 ++-
> p_haul_iters.py | 212 +++++++++++++++++++++++++++---------------------------
> p_haul_service.py | 142 ++++++++++++++++++------------------
> 3 files changed, 185 insertions(+), 179 deletions(-)
>
> diff --git a/p_haul_criu.py b/p_haul_criu.py
> index ef40710..764a2bc 100644
> --- a/p_haul_criu.py
> +++ b/p_haul_criu.py
> @@ -24,10 +24,16 @@ def_verb = 2
> #
>
> class criu_conn:
> - def __init__(self):
> + def __enter__(self):
> + print "--------- __enter__ --------"
> self.cs = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
> self.cs.connect(criu_socket)
> self.verb = def_verb
> + return self
> +
> + def __exit__(self, type, value, traceback):
> + print "--------- __exit__ --------"
> + self.cs.close()
>
> def verbose(self, level):
> self.verb = level
> @@ -44,7 +50,7 @@ class criu_conn:
> resp.ParseFromString(self.cs.recv(1024))
> return resp
>
> - def ack_notify(self):
> + def ack_notify(self, success = True):
> req = cr_rpc.criu_req()
> req.type = cr_rpc.NOTIFY
> req.notify_success = True
> diff --git a/p_haul_iters.py b/p_haul_iters.py
> index 92797ea..c6cbfb0 100644
> --- a/p_haul_iters.py
> +++ b/p_haul_iters.py
> @@ -76,141 +76,141 @@ class phaul_iter_worker:
>
> def start_migration(self):
> print "Connecting to CRIU service"
> - cc = cr_api.criu_conn()
> - cc.verbose(self.verb)
> + with cr_api.criu_conn() as cc:
> + cc.verbose(self.verb)
>
> - start_time = time.time()
> - iter_times = []
> + start_time = time.time()
> + iter_times = []
>
> - print "Preliminary FS migration"
> - self.fs.set_work_dir(self.img.work_dir())
> - self.fs.start_migration()
> + print "Preliminary FS migration"
> + self.fs.set_work_dir(self.img.work_dir())
> + self.fs.start_migration()
>
> - print "Starting iterations"
> - while True:
> - print "* Iteration %d" % self.iteration
> + print "Starting iterations"
> + while True:
> + print "* Iteration %d" % self.iteration
>
> - self.th.start_iter()
> - self.img.new_image_dir()
> + self.th.start_iter()
> + self.img.new_image_dir()
>
> - print "\tIssuing pre-dump command to service"
> + print "\tIssuing pre-dump command to service"
>
> - req = self.make_dump_req(cr_rpc.PRE_DUMP)
> - resp = cc.send_req(req)
> - if (resp.type != cr_rpc.PRE_DUMP) or (not resp.success):
> - raise Exception("Pre-dump failed")
> + req = self.make_dump_req(cr_rpc.PRE_DUMP)
> + resp = cc.send_req(req)
> + if (resp.type != cr_rpc.PRE_DUMP) or (not resp.success):
> + raise Exception("Pre-dump failed")
>
> - print "\tPre-dump succeeded"
> + print "\tPre-dump succeeded"
>
> - self.th.end_iter()
> + self.th.end_iter()
>
> - stats = cr_api.criu_get_dstats(self.img)
> - print "Dumped %d pages, %d skipped" % \
> - (stats.pages_written, stats.pages_skipped_parent)
> + stats = cr_api.criu_get_dstats(self.img)
> + print "Dumped %d pages, %d skipped" % \
> + (stats.pages_written, stats.pages_skipped_parent)
>
> - iter_times.append("%.2lf" % (stats.frozen_time / 1000000.))
> - self.frozen_time += stats.frozen_time
> + iter_times.append("%.2lf" % (stats.frozen_time / 1000000.))
> + self.frozen_time += stats.frozen_time
>
> - #
> - # Need to decide whether we do next iteration
> - # or stop on the existing and go do full dump
> - # and restore
> - #
> -
> - print "Checking iteration progress:"
> -
> - if stats.pages_written <= phaul_iter_min_size:
> - print "\t> Small dump"
> - break;
> -
> - if self.prev_stats:
> - w_add = stats.pages_written - self.prev_stats.pages_written
> - w_add = w_add * 100 / self.prev_stats.pages_written
> - if w_add > phaul_iter_grow_max:
> - print "\t> Iteration grows"
> - break
> + #
> + # Need to decide whether we do next iteration
> + # or stop on the existing and go do full dump
> + # and restore
> + #
>
> - if self.iteration >= phaul_iter_max:
> - print "\t> Too many iterations"
> - break
> + print "Checking iteration progress:"
>
> - self.iteration += 1
> - self.prev_stats = stats
> - print "\t> Proceed to next iteration"
> + if stats.pages_written <= phaul_iter_min_size:
> + print "\t> Small dump"
> + break;
>
> - self.fs.next_iteration()
> + if self.prev_stats:
> + w_add = stats.pages_written - self.prev_stats.pages_written
> + w_add = w_add * 100 / self.prev_stats.pages_written
> + if w_add > phaul_iter_grow_max:
> + print "\t> Iteration grows"
> + break
>
> - #
> - # Finish with iterations -- do full dump, send images
> - # to target host and restore from them there
> - #
> + if self.iteration >= phaul_iter_max:
> + print "\t> Too many iterations"
> + break
>
> - print "Final dump and restore"
> + self.iteration += 1
> + self.prev_stats = stats
> + print "\t> Proceed to next iteration"
>
> - self.th.start_iter()
> - self.img.new_image_dir()
> + self.fs.next_iteration()
>
> - print "\tIssuing dump command to service"
> - req = self.make_dump_req(cr_rpc.DUMP)
> - req.opts.notify_scripts = True
> - req.opts.file_locks = True
> - req.opts.evasive_devices = True
> - req.opts.link_remap = True
> - if self.htype.can_migrate_tcp:
> - req.opts.tcp_established = True
> + #
> + # Finish with iterations -- do full dump, send images
> + # to target host and restore from them there
> + #
>
> - cc.send_req(req, False)
> + print "Final dump and restore"
>
> - while True:
> - resp = cc.recv_resp()
> - if resp.type != cr_rpc.NOTIFY:
> - if resp.type == cr_rpc.DUMP and not resp.success:
> - raise Exception("Dump failed")
> - else:
> - raise Exception("Unexpected responce from service (%d)" % resp.type)
> + self.th.start_iter()
> + self.img.new_image_dir()
>
> - if resp.notify.script == "post-dump":
> - #
> - # Dump is effectively over. Now CRIU
> - # waits for us to do whatever we want
> - # and keeps the tasks frozen.
> - #
> - break
> + print "\tIssuing dump command to service"
> + req = self.make_dump_req(cr_rpc.DUMP)
> + req.opts.notify_scripts = True
> + req.opts.file_locks = True
> + req.opts.evasive_devices = True
> + req.opts.link_remap = True
> + if self.htype.can_migrate_tcp:
> + req.opts.tcp_established = True
> +
> + cc.send_req(req, False)
> +
> + while True:
> + resp = cc.recv_resp()
> + if resp.type != cr_rpc.NOTIFY:
> + if resp.type == cr_rpc.DUMP and not resp.success:
> + raise Exception("Dump failed")
> + else:
> + raise Exception("Unexpected responce from service (%d)" % resp.type)
> +
> + if resp.notify.script == "post-dump":
> + #
> + # Dump is effectively over. Now CRIU
> + # waits for us to do whatever we want
> + # and keeps the tasks frozen.
> + #
> + break
>
> - elif resp.notify.script == "network-lock":
> - self.htype.net_lock()
> - elif resp.notify.script == "network-unlock":
> - self.htype.net_unlock()
> + elif resp.notify.script == "network-lock":
> + self.htype.net_lock()
> + elif resp.notify.script == "network-unlock":
> + self.htype.net_unlock()
>
> - print "\t\tNotify (%s)" % resp.notify.script
> - cc.ack_notify()
> + print "\t\tNotify (%s)" % resp.notify.script
> + cc.ack_notify()
>
> - print "Dump complete"
> - self.th.end_iter()
> + print "Dump complete"
> + self.th.end_iter()
>
> - #
> - # Dump is complete -- go to target node,
> - # restore them there and kill (if required)
> - # tasks on source node
> - #
> + #
> + # Dump is complete -- go to target node,
> + # restore them there and kill (if required)
> + # tasks on source node
> + #
>
> - print "Final FS and images sync"
> - self.fs.stop_migration()
> - self.img.sync_imgs_to_target(self.th, self.htype)
> + print "Final FS and images sync"
> + self.fs.stop_migration()
> + self.img.sync_imgs_to_target(self.th, self.htype)
>
> - print "Asking target host to restore"
> - self.th.restore_from_images()
> + print "Asking target host to restore"
> + self.th.restore_from_images()
>
> - #
> - # Ack the notify after restore -- CRIU would
> - # then terminate all tasks and send us back
> - # DUMP/success message
> - #
> + #
> + # Ack the notify after restore -- CRIU would
> + # then terminate all tasks and send us back
> + # DUMP/success message
> + #
>
> - cc.ack_notify()
> - resp = cc.recv_resp()
> - if resp.type != cr_rpc.DUMP:
> - raise Exception("Dump failed")
> + cc.ack_notify()
> + resp = cc.recv_resp()
> + if resp.type != cr_rpc.DUMP:
> + raise Exception("Dump failed")
>
> self.htype.umount()
>
> diff --git a/p_haul_service.py b/p_haul_service.py
> index ae243d6..3aae1b4 100644
> --- a/p_haul_service.py
> +++ b/p_haul_service.py
> @@ -45,26 +45,26 @@ class phaul_service(rpyc.Service):
>
> def start_page_server(self):
> print "Starting page server for iter %d" % self.dump_iter
> - cc = cr_api.criu_conn()
> - cc.verbose(self.verb)
> + with cr_api.criu_conn() as cc:
> + cc.verbose(self.verb)
>
> - req = cr_rpc.criu_req()
> - req.type = cr_rpc.PAGE_SERVER
> - req.opts.ps.port = ps_start_port + self.dump_iter # FIXME -- implement and use autobind in CRIU
> + req = cr_rpc.criu_req()
> + req.type = cr_rpc.PAGE_SERVER
> + req.opts.ps.port = ps_start_port + self.dump_iter # FIXME -- implement and use autobind in CRIU
>
> - req.opts.images_dir_fd = self.img.image_dir_fd()
> - req.opts.work_dir_fd = self.img.work_dir_fd()
> - p_img = self.img.prev_image_dir()
> - if p_img:
> - req.opts.parent_img = p_img
> + req.opts.images_dir_fd = self.img.image_dir_fd()
> + req.opts.work_dir_fd = self.img.work_dir_fd()
> + p_img = self.img.prev_image_dir()
> + if p_img:
> + req.opts.parent_img = p_img
>
> - print "\tSending criu rpc req"
> - resp = cc.send_req(req)
> - if (resp.type != cr_rpc.PAGE_SERVER) or (not resp.success):
> - raise Exception("Failed to start page server")
> + print "\tSending criu rpc req"
> + resp = cc.send_req(req)
> + if (resp.type != cr_rpc.PAGE_SERVER) or (not resp.success):
> + raise Exception("Failed to start page server")
>
> - self.page_server_pid = resp.ps.pid
> - print "\tPage server started at %d" % resp.ps.pid
> + self.page_server_pid = resp.ps.pid
> + print "\tPage server started at %d" % resp.ps.pid
>
> def exposed_start_iter(self):
> self.dump_iter += 1
> @@ -79,61 +79,61 @@ class phaul_service(rpyc.Service):
>
> def exposed_restore_from_images(self):
> print "Restoring from images"
> - cc = cr_api.criu_conn()
> - cc.verbose(self.verb)
> -
> - self.htype.put_meta_images(self.img.image_dir())
> -
> - req = cr_rpc.criu_req()
> - req.type = cr_rpc.RESTORE
> - req.opts.images_dir_fd = self.img.image_dir_fd()
> - req.opts.work_dir_fd = self.img.work_dir_fd()
> - req.opts.notify_scripts = True
> -
> - if self.htype.can_migrate_tcp():
> - req.opts.tcp_established = True
> -
> - for veth in self.htype.veths():
> - v = req.opts.veths.add()
> - v.if_in = veth.name
> - v.if_out = veth.pair
> -
> - nroot = self.htype.mount()
> - if nroot:
> - req.opts.root = nroot
> - print "Restore root set to %s" % req.opts.root
> -
> - cc.send_req(req, False)
> -
> - while True:
> - resp = cc.recv_resp()
> - if resp.type == cr_rpc.NOTIFY:
> - print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
> - if resp.notify.script == "setup-namespaces":
> - #
> - # At that point we have only one task
> - # living in namespaces and waiting for
> - # us to ACK the notify. Htype might want
> - # to configure namespace (external net
> - # devices) and cgroups
> - #
> - self.htype.prepare_ct(resp.notify.pid)
> - elif resp.notify.script == "network-unlock":
> - self.htype.net_unlock()
> - elif resp.notify.script == "network-lock":
> - raise Exception("Locking network on restore?")
> -
> - cc.ack_notify()
> - continue
> -
> - if resp.type != cr_rpc.RESTORE:
> - raise Exception("Unexpected responce from service (%d)" % resp.type)
> -
> - if not resp.success:
> - raise Exception("Restore failed")
> -
> - print "Restore succeeded"
> - break
> + with cr_api.criu_conn() as cc:
> + cc.verbose(self.verb)
> +
> + self.htype.put_meta_images(self.img.image_dir())
> +
> + req = cr_rpc.criu_req()
> + req.type = cr_rpc.RESTORE
> + req.opts.images_dir_fd = self.img.image_dir_fd()
> + req.opts.work_dir_fd = self.img.work_dir_fd()
> + req.opts.notify_scripts = True
> +
> + if self.htype.can_migrate_tcp():
> + req.opts.tcp_established = True
> +
> + for veth in self.htype.veths():
> + v = req.opts.veths.add()
> + v.if_in = veth.name
> + v.if_out = veth.pair
> +
> + nroot = self.htype.mount()
> + if nroot:
> + req.opts.root = nroot
> + print "Restore root set to %s" % req.opts.root
> +
> + cc.send_req(req, False)
> +
> + while True:
> + resp = cc.recv_resp()
> + if resp.type == cr_rpc.NOTIFY:
> + print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
> + if resp.notify.script == "setup-namespaces":
> + #
> + # At that point we have only one task
> + # living in namespaces and waiting for
> + # us to ACK the notify. Htype might want
> + # to configure namespace (external net
> + # devices) and cgroups
> + #
> + self.htype.prepare_ct(resp.notify.pid)
> + elif resp.notify.script == "network-unlock":
> + self.htype.net_unlock()
> + elif resp.notify.script == "network-lock":
> + raise Exception("Locking network on restore?")
> +
> + cc.ack_notify()
> + continue
> +
> + if resp.type != cr_rpc.RESTORE:
> + raise Exception("Unexpected responce from service (%d)" % resp.type)
> +
> + if not resp.success:
> + raise Exception("Restore failed")
> +
> + print "Restore succeeded"
> + break
>
> self.htype.restored(resp.restore.pid)
> self.restored = True
> --
> 1.8.5.3
>
More information about the CRIU
mailing list