[CRIU] [PATCH] [RFC] p.haul: close socket to the CRIU service if smth failed

Andrey Vagin avagin at openvz.org
Mon Mar 24 06:26:34 PDT 2014


This patch implements __exit__ and __enter__ methods for the
criu_conn class and uses the "with" statement to access objects
of this class.

"""
When the “with” statement is executed, Python evaluates the expression,
calls the __enter__ method on the resulting value (which is called a
“context guard”), and assigns whatever __enter__ returns to the variable
given by as. Python will then execute the code body, and no matter what
happens in that code, call the guard object’s __exit__ method.
"""

Currently this socket is not closed in error cases. For example:

[src] $ ./p.haul ovz 1001 192.168.122.36
Asking target host to restore
======= Remote traceback =======
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 300, in _dispatch_request
    res = self._HANDLERS[handler](self, *args)
  File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 532, in _handle_call
    return self._local_objects[oid](*args, **dict(kwargs))
  File "/vz/lxc/p.haul/p_haul_service.py", line 122, in exposed_restore_from_images
    self.htype.net_unlock()
  File "/vz/lxc/p.haul/p_haul_ovz.py", line 177, in net_unlock
    netif.ifup(veth.pairs)
AttributeError: net_dev instance has no attribute 'pairs'

======= Local exception ========
Traceback (most recent call last):
  File "./p.haul", line 39, in <module>
    worker.start_migration()
  File "/vz/lxc/p.haul/p_haul_iters.py", line 202, in start_migration
    self.th.restore_from_images()
  File "/usr/lib/python2.7/site-packages/rpyc/core/netref.py", line 196, in __call__
    return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
  File "/usr/lib/python2.7/site-packages/rpyc/core/netref.py", line 71, in syncreq
    return conn.sync_request(handler, oid, *args)
  File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 438, in sync_request
    raise obj
AttributeError: net_dev instance has no attribute 'pairs'

[src] $ ps axf
...
  912 ?        Ss     0:00 init
...

[dst] $ ps axf
...
28170 ?        Ss     0:00 ./criu service -v4 -o criu.service.log --daemon
28293 ?        S      0:00  \_ ./criu service -v4 -o criu.service.log --daemon
28295 ?        Ss     0:00      \_ init

[dst] # lsof -p 28293 | grep unix
criu    28293 root    4u  unix 0xffff8800b9a3d280      0t0      79903 /var/run/criu_service.socket
[dst] # ss -x | grep 79903
u_dgr  ESTAB      0      0      /var/run/criu_service.socket 79903                 * 79401
u_dgr  ESTAB      0      0                    * 79401                 * 79903

Signed-off-by: Andrey Vagin <avagin at openvz.org>
---
 p_haul_criu.py    |   8 ++-
 p_haul_iters.py   | 212 +++++++++++++++++++++++++++---------------------------
 p_haul_service.py | 142 ++++++++++++++++++------------------
 3 files changed, 183 insertions(+), 179 deletions(-)

diff --git a/p_haul_criu.py b/p_haul_criu.py
index 21786c6..02e7e18 100644
--- a/p_haul_criu.py
+++ b/p_haul_criu.py
@@ -24,10 +24,14 @@ def_verb = 2
 #
 
 class criu_conn:
-	def __init__(self):
+	def __enter__(self):
 		self.cs = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
 		self.cs.connect(criu_socket)
 		self.verb = def_verb
+		return self
+
+	def __exit__(self, type, value, traceback):
+		self.cs.close()
 
 	def verbose(self, level):
 		self.verb = level
@@ -44,7 +48,7 @@ class criu_conn:
 		resp.ParseFromString(self.cs.recv(1024))
 		return resp
 
-	def ack_notify(self):
+	def ack_notify(self, success = True):
 		req = cr_rpc.criu_req()
 		req.type = cr_rpc.NOTIFY
 		req.notify_success = True
diff --git a/p_haul_iters.py b/p_haul_iters.py
index 92797ea..c6cbfb0 100644
--- a/p_haul_iters.py
+++ b/p_haul_iters.py
@@ -76,141 +76,141 @@ class phaul_iter_worker:
 
 	def start_migration(self):
 		print "Connecting to CRIU service"
-		cc = cr_api.criu_conn()
-		cc.verbose(self.verb)
+		with cr_api.criu_conn() as cc:
+			cc.verbose(self.verb)
 
-		start_time = time.time()
-		iter_times = []
+			start_time = time.time()
+			iter_times = []
 
-		print "Preliminary FS migration"
-		self.fs.set_work_dir(self.img.work_dir())
-		self.fs.start_migration()
+			print "Preliminary FS migration"
+			self.fs.set_work_dir(self.img.work_dir())
+			self.fs.start_migration()
 
-		print "Starting iterations"
-		while True:
-			print "* Iteration %d" % self.iteration
+			print "Starting iterations"
+			while True:
+				print "* Iteration %d" % self.iteration
 
-			self.th.start_iter()
-			self.img.new_image_dir()
+				self.th.start_iter()
+				self.img.new_image_dir()
 
-			print "\tIssuing pre-dump command to service"
+				print "\tIssuing pre-dump command to service"
 
-			req = self.make_dump_req(cr_rpc.PRE_DUMP)
-			resp = cc.send_req(req)
-			if (resp.type != cr_rpc.PRE_DUMP) or (not resp.success):
-				raise Exception("Pre-dump failed")
+				req = self.make_dump_req(cr_rpc.PRE_DUMP)
+				resp = cc.send_req(req)
+				if (resp.type != cr_rpc.PRE_DUMP) or (not resp.success):
+					raise Exception("Pre-dump failed")
 
-			print "\tPre-dump succeeded"
+				print "\tPre-dump succeeded"
 
-			self.th.end_iter()
+				self.th.end_iter()
 
-			stats = cr_api.criu_get_dstats(self.img)
-			print "Dumped %d pages, %d skipped" % \
-					(stats.pages_written, stats.pages_skipped_parent)
+				stats = cr_api.criu_get_dstats(self.img)
+				print "Dumped %d pages, %d skipped" % \
+						(stats.pages_written, stats.pages_skipped_parent)
 
-			iter_times.append("%.2lf" % (stats.frozen_time / 1000000.))
-			self.frozen_time += stats.frozen_time
+				iter_times.append("%.2lf" % (stats.frozen_time / 1000000.))
+				self.frozen_time += stats.frozen_time
 
-			#
-			# Need to decide whether we do next iteration
-			# or stop on the existing and go do full dump
-			# and restore
-			#
-
-			print "Checking iteration progress:"
-
-			if stats.pages_written <= phaul_iter_min_size:
-				print "\t> Small dump"
-				break;
-
-			if self.prev_stats:
-				w_add = stats.pages_written - self.prev_stats.pages_written
-				w_add = w_add * 100 / self.prev_stats.pages_written
-				if w_add > phaul_iter_grow_max:
-					print "\t> Iteration grows"
-					break
+				#
+				# Need to decide whether we do next iteration
+				# or stop on the existing and go do full dump
+				# and restore
+				#
 
-			if self.iteration >= phaul_iter_max:
-				print "\t> Too many iterations"
-				break
+				print "Checking iteration progress:"
 
-			self.iteration += 1
-			self.prev_stats = stats
-			print "\t> Proceed to next iteration"
+				if stats.pages_written <= phaul_iter_min_size:
+					print "\t> Small dump"
+					break;
 
-			self.fs.next_iteration()
+				if self.prev_stats:
+					w_add = stats.pages_written - self.prev_stats.pages_written
+					w_add = w_add * 100 / self.prev_stats.pages_written
+					if w_add > phaul_iter_grow_max:
+						print "\t> Iteration grows"
+						break
 
-		#
-		# Finish with iterations -- do full dump, send images
-		# to target host and restore from them there
-		#
+				if self.iteration >= phaul_iter_max:
+					print "\t> Too many iterations"
+					break
 
-		print "Final dump and restore"
+				self.iteration += 1
+				self.prev_stats = stats
+				print "\t> Proceed to next iteration"
 
-		self.th.start_iter()
-		self.img.new_image_dir()
+				self.fs.next_iteration()
 
-		print "\tIssuing dump command to service"
-		req = self.make_dump_req(cr_rpc.DUMP)
-		req.opts.notify_scripts = True
-		req.opts.file_locks = True
-		req.opts.evasive_devices = True
-		req.opts.link_remap = True
-		if self.htype.can_migrate_tcp:
-			req.opts.tcp_established = True
+			#
+			# Finish with iterations -- do full dump, send images
+			# to target host and restore from them there
+			#
 
-		cc.send_req(req, False)
+			print "Final dump and restore"
 
-		while True:
-			resp = cc.recv_resp()
-			if resp.type != cr_rpc.NOTIFY:
-				if resp.type == cr_rpc.DUMP and not resp.success:
-					raise Exception("Dump failed")
-				else:
-					raise Exception("Unexpected responce from service (%d)" % resp.type)
+			self.th.start_iter()
+			self.img.new_image_dir()
 
-			if resp.notify.script == "post-dump":
-				#
-				# Dump is effectively over. Now CRIU
-				# waits for us to do whatever we want
-				# and keeps the tasks frozen.
-				#
-				break
+			print "\tIssuing dump command to service"
+			req = self.make_dump_req(cr_rpc.DUMP)
+			req.opts.notify_scripts = True
+			req.opts.file_locks = True
+			req.opts.evasive_devices = True
+			req.opts.link_remap = True
+			if self.htype.can_migrate_tcp:
+				req.opts.tcp_established = True
+
+			cc.send_req(req, False)
+
+			while True:
+				resp = cc.recv_resp()
+				if resp.type != cr_rpc.NOTIFY:
+					if resp.type == cr_rpc.DUMP and not resp.success:
+						raise Exception("Dump failed")
+					else:
+						raise Exception("Unexpected responce from service (%d)" % resp.type)
+
+				if resp.notify.script == "post-dump":
+					#
+					# Dump is effectively over. Now CRIU
+					# waits for us to do whatever we want
+					# and keeps the tasks frozen.
+					#
+					break
 
-			elif resp.notify.script == "network-lock":
-				self.htype.net_lock()
-			elif resp.notify.script == "network-unlock":
-				self.htype.net_unlock()
+				elif resp.notify.script == "network-lock":
+					self.htype.net_lock()
+				elif resp.notify.script == "network-unlock":
+					self.htype.net_unlock()
 
-			print "\t\tNotify (%s)" % resp.notify.script
-			cc.ack_notify()
+				print "\t\tNotify (%s)" % resp.notify.script
+				cc.ack_notify()
 
-		print "Dump complete"
-		self.th.end_iter()
+			print "Dump complete"
+			self.th.end_iter()
 
-		#
-		# Dump is complete -- go to target node,
-		# restore them there and kill (if required)
-		# tasks on source node
-		#
+			#
+			# Dump is complete -- go to target node,
+			# restore them there and kill (if required)
+			# tasks on source node
+			#
 
-		print "Final FS and images sync"
-		self.fs.stop_migration()
-		self.img.sync_imgs_to_target(self.th, self.htype)
+			print "Final FS and images sync"
+			self.fs.stop_migration()
+			self.img.sync_imgs_to_target(self.th, self.htype)
 
-		print "Asking target host to restore"
-		self.th.restore_from_images()
+			print "Asking target host to restore"
+			self.th.restore_from_images()
 
-		#
-		# Ack the notify after restore -- CRIU would
-		# then terminate all tasks and send us back
-	 	# DUMP/success message
-		#
+			#
+			# Ack the notify after restore -- CRIU would
+			# then terminate all tasks and send us back
+			# DUMP/success message
+			#
 
-		cc.ack_notify()
-		resp = cc.recv_resp()
-		if resp.type != cr_rpc.DUMP:
-			raise Exception("Dump failed")
+			cc.ack_notify()
+			resp = cc.recv_resp()
+			if resp.type != cr_rpc.DUMP:
+				raise Exception("Dump failed")
 
 		self.htype.umount()
 
diff --git a/p_haul_service.py b/p_haul_service.py
index ae243d6..3aae1b4 100644
--- a/p_haul_service.py
+++ b/p_haul_service.py
@@ -45,26 +45,26 @@ class phaul_service(rpyc.Service):
 
 	def start_page_server(self):
 		print "Starting page server for iter %d" % self.dump_iter
-		cc = cr_api.criu_conn()
-		cc.verbose(self.verb)
+		with cr_api.criu_conn() as cc:
+			cc.verbose(self.verb)
 
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.PAGE_SERVER
-		req.opts.ps.port = ps_start_port + self.dump_iter # FIXME -- implement and use autobind in CRIU
+			req = cr_rpc.criu_req()
+			req.type = cr_rpc.PAGE_SERVER
+			req.opts.ps.port = ps_start_port + self.dump_iter # FIXME -- implement and use autobind in CRIU
 
-		req.opts.images_dir_fd = self.img.image_dir_fd()
-		req.opts.work_dir_fd = self.img.work_dir_fd()
-		p_img = self.img.prev_image_dir()
-		if p_img:
-			req.opts.parent_img = p_img
+			req.opts.images_dir_fd = self.img.image_dir_fd()
+			req.opts.work_dir_fd = self.img.work_dir_fd()
+			p_img = self.img.prev_image_dir()
+			if p_img:
+				req.opts.parent_img = p_img
 
-		print "\tSending criu rpc req"
-		resp = cc.send_req(req)
-		if (resp.type != cr_rpc.PAGE_SERVER) or (not resp.success):
-			raise Exception("Failed to start page server")
+			print "\tSending criu rpc req"
+			resp = cc.send_req(req)
+			if (resp.type != cr_rpc.PAGE_SERVER) or (not resp.success):
+				raise Exception("Failed to start page server")
 
-		self.page_server_pid = resp.ps.pid
-		print "\tPage server started at %d" % resp.ps.pid
+			self.page_server_pid = resp.ps.pid
+			print "\tPage server started at %d" % resp.ps.pid
 
 	def exposed_start_iter(self):
 		self.dump_iter += 1
@@ -79,61 +79,61 @@ class phaul_service(rpyc.Service):
 
 	def exposed_restore_from_images(self):
 		print "Restoring from images"
-		cc = cr_api.criu_conn()
-		cc.verbose(self.verb)
-
-		self.htype.put_meta_images(self.img.image_dir())
-
-		req = cr_rpc.criu_req()
-		req.type = cr_rpc.RESTORE
-		req.opts.images_dir_fd = self.img.image_dir_fd()
-		req.opts.work_dir_fd = self.img.work_dir_fd()
-		req.opts.notify_scripts = True
-
-		if self.htype.can_migrate_tcp():
-			req.opts.tcp_established = True
-
-		for veth in self.htype.veths():
-			v = req.opts.veths.add()
-			v.if_in = veth.name
-			v.if_out = veth.pair
-
-		nroot = self.htype.mount()
-		if nroot:
-			req.opts.root = nroot
-			print "Restore root set to %s" % req.opts.root
-
-		cc.send_req(req, False)
-
-		while True:
-			resp = cc.recv_resp()
-			if resp.type == cr_rpc.NOTIFY:
-				print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
-				if resp.notify.script == "setup-namespaces":
-					#
-					# At that point we have only one task
-					# living in namespaces and waiting for
-					# us to ACK the notify. Htype might want
-					# to configure namespace (external net
-					# devices) and cgroups
-					#
-					self.htype.prepare_ct(resp.notify.pid)
-				elif resp.notify.script == "network-unlock":
-					self.htype.net_unlock()
-				elif resp.notify.script == "network-lock":
-					raise Exception("Locking network on restore?")
-
-				cc.ack_notify()
-				continue
-
-			if resp.type != cr_rpc.RESTORE:
-				raise Exception("Unexpected responce from service (%d)" % resp.type)
-
-			if not resp.success:
-				raise Exception("Restore failed")
-
-			print "Restore succeeded"
-			break
+		with cr_api.criu_conn() as cc:
+			cc.verbose(self.verb)
+
+			self.htype.put_meta_images(self.img.image_dir())
+
+			req = cr_rpc.criu_req()
+			req.type = cr_rpc.RESTORE
+			req.opts.images_dir_fd = self.img.image_dir_fd()
+			req.opts.work_dir_fd = self.img.work_dir_fd()
+			req.opts.notify_scripts = True
+
+			if self.htype.can_migrate_tcp():
+				req.opts.tcp_established = True
+
+			for veth in self.htype.veths():
+				v = req.opts.veths.add()
+				v.if_in = veth.name
+				v.if_out = veth.pair
+
+			nroot = self.htype.mount()
+			if nroot:
+				req.opts.root = nroot
+				print "Restore root set to %s" % req.opts.root
+
+			cc.send_req(req, False)
+
+			while True:
+				resp = cc.recv_resp()
+				if resp.type == cr_rpc.NOTIFY:
+					print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
+					if resp.notify.script == "setup-namespaces":
+						#
+						# At that point we have only one task
+						# living in namespaces and waiting for
+						# us to ACK the notify. Htype might want
+						# to configure namespace (external net
+						# devices) and cgroups
+						#
+						self.htype.prepare_ct(resp.notify.pid)
+					elif resp.notify.script == "network-unlock":
+						self.htype.net_unlock()
+					elif resp.notify.script == "network-lock":
+						raise Exception("Locking network on restore?")
+
+					cc.ack_notify()
+					continue
+
+				if resp.type != cr_rpc.RESTORE:
+					raise Exception("Unexpected responce from service (%d)" % resp.type)
+
+				if not resp.success:
+					raise Exception("Restore failed")
+
+				print "Restore succeeded"
+				break
 
 		self.htype.restored(resp.restore.pid)
 		self.restored = True
-- 
1.8.5.3



More information about the CRIU mailing list