[CRIU] [PATCH] [RFC] p.haul: close socket to the CRIU service if smth failed

Andrew Vagin avagin at parallels.com
Mon Mar 24 06:19:20 PDT 2014


Sorry, I forgot to remove debug messages.

On Mon, Mar 24, 2014 at 05:16:11PM +0400, Andrey Vagin wrote:
> Currently this socket is not closed in error cases. For example:
> 
> [src] $ ./p.haul ovz 1001 192.168.122.36
> Asking target host to restore
> ======= Remote traceback =======
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 300, in _dispatch_request
>     res = self._HANDLERS[handler](self, *args)
>   File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 532, in _handle_call
>     return self._local_objects[oid](*args, **dict(kwargs))
>   File "/vz/lxc/p.haul/p_haul_service.py", line 122, in exposed_restore_from_images
>     self.htype.net_unlock()
>   File "/vz/lxc/p.haul/p_haul_ovz.py", line 177, in net_unlock
>     netif.ifup(veth.pairs)
> AttributeError: net_dev instance has no attribute 'pairs'
> 
> ======= Local exception ========
> Traceback (most recent call last):
>   File "./p.haul", line 39, in <module>
>     worker.start_migration()
>   File "/vz/lxc/p.haul/p_haul_iters.py", line 202, in start_migration
>     self.th.restore_from_images()
>   File "/usr/lib/python2.7/site-packages/rpyc/core/netref.py", line 196, in __call__
>     return syncreq(_self, consts.HANDLE_CALL, args, kwargs)
>   File "/usr/lib/python2.7/site-packages/rpyc/core/netref.py", line 71, in syncreq
>     return conn.sync_request(handler, oid, *args)
>   File "/usr/lib/python2.7/site-packages/rpyc/core/protocol.py", line 438, in sync_request
>     raise obj
> AttributeError: net_dev instance has no attribute 'pairs'
> 
> [src] $ ps axf
> ...
>   912 ?        Ss     0:00 init
> ...
> 
> [dst] $ ps axf
> ...
> 28170 ?        Ss     0:00 ./criu service -v4 -o criu.service.log --daemon
> 28293 ?        S      0:00  \_ ./criu service -v4 -o criu.service.log --daemon
> 28295 ?        Ss     0:00      \_ init
> 
> [dst] # lsof -p 28293 | grep unix
> criu    28293 root    4u  unix 0xffff8800b9a3d280      0t0      79903 /var/run/criu_service.socket
> [dst] # ss -x | grep 79903
> u_dgr  ESTAB      0      0      /var/run/criu_service.socket 79903                 * 79401
> u_dgr  ESTAB      0      0                    * 79401                 * 79903
> 
> Signed-off-by: Andrey Vagin <avagin at openvz.org>
> ---
>  p_haul_criu.py    |  10 ++-
>  p_haul_iters.py   | 212 +++++++++++++++++++++++++++---------------------------
>  p_haul_service.py | 142 ++++++++++++++++++------------------
>  3 files changed, 185 insertions(+), 179 deletions(-)
> 
> diff --git a/p_haul_criu.py b/p_haul_criu.py
> index ef40710..764a2bc 100644
> --- a/p_haul_criu.py
> +++ b/p_haul_criu.py
> @@ -24,10 +24,16 @@ def_verb = 2
>  #
>  
>  class criu_conn:
> -	def __init__(self):
> +	def __enter__(self):
> +		print "--------- __enter__ --------"
>  		self.cs = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
>  		self.cs.connect(criu_socket)
>  		self.verb = def_verb
> +		return self
> +
> +	def __exit__(self, type, value, traceback):
> +		print "--------- __exit__ --------"
> +		self.cs.close()
>  
>  	def verbose(self, level):
>  		self.verb = level
> @@ -44,7 +50,7 @@ class criu_conn:
>  		resp.ParseFromString(self.cs.recv(1024))
>  		return resp
>  
> -	def ack_notify(self):
> +	def ack_notify(self, success = True):
>  		req = cr_rpc.criu_req()
>  		req.type = cr_rpc.NOTIFY
>  		req.notify_success = True
> diff --git a/p_haul_iters.py b/p_haul_iters.py
> index 92797ea..c6cbfb0 100644
> --- a/p_haul_iters.py
> +++ b/p_haul_iters.py
> @@ -76,141 +76,141 @@ class phaul_iter_worker:
>  
>  	def start_migration(self):
>  		print "Connecting to CRIU service"
> -		cc = cr_api.criu_conn()
> -		cc.verbose(self.verb)
> +		with cr_api.criu_conn() as cc:
> +			cc.verbose(self.verb)
>  
> -		start_time = time.time()
> -		iter_times = []
> +			start_time = time.time()
> +			iter_times = []
>  
> -		print "Preliminary FS migration"
> -		self.fs.set_work_dir(self.img.work_dir())
> -		self.fs.start_migration()
> +			print "Preliminary FS migration"
> +			self.fs.set_work_dir(self.img.work_dir())
> +			self.fs.start_migration()
>  
> -		print "Starting iterations"
> -		while True:
> -			print "* Iteration %d" % self.iteration
> +			print "Starting iterations"
> +			while True:
> +				print "* Iteration %d" % self.iteration
>  
> -			self.th.start_iter()
> -			self.img.new_image_dir()
> +				self.th.start_iter()
> +				self.img.new_image_dir()
>  
> -			print "\tIssuing pre-dump command to service"
> +				print "\tIssuing pre-dump command to service"
>  
> -			req = self.make_dump_req(cr_rpc.PRE_DUMP)
> -			resp = cc.send_req(req)
> -			if (resp.type != cr_rpc.PRE_DUMP) or (not resp.success):
> -				raise Exception("Pre-dump failed")
> +				req = self.make_dump_req(cr_rpc.PRE_DUMP)
> +				resp = cc.send_req(req)
> +				if (resp.type != cr_rpc.PRE_DUMP) or (not resp.success):
> +					raise Exception("Pre-dump failed")
>  
> -			print "\tPre-dump succeeded"
> +				print "\tPre-dump succeeded"
>  
> -			self.th.end_iter()
> +				self.th.end_iter()
>  
> -			stats = cr_api.criu_get_dstats(self.img)
> -			print "Dumped %d pages, %d skipped" % \
> -					(stats.pages_written, stats.pages_skipped_parent)
> +				stats = cr_api.criu_get_dstats(self.img)
> +				print "Dumped %d pages, %d skipped" % \
> +						(stats.pages_written, stats.pages_skipped_parent)
>  
> -			iter_times.append("%.2lf" % (stats.frozen_time / 1000000.))
> -			self.frozen_time += stats.frozen_time
> +				iter_times.append("%.2lf" % (stats.frozen_time / 1000000.))
> +				self.frozen_time += stats.frozen_time
>  
> -			#
> -			# Need to decide whether we do next iteration
> -			# or stop on the existing and go do full dump
> -			# and restore
> -			#
> -
> -			print "Checking iteration progress:"
> -
> -			if stats.pages_written <= phaul_iter_min_size:
> -				print "\t> Small dump"
> -				break;
> -
> -			if self.prev_stats:
> -				w_add = stats.pages_written - self.prev_stats.pages_written
> -				w_add = w_add * 100 / self.prev_stats.pages_written
> -				if w_add > phaul_iter_grow_max:
> -					print "\t> Iteration grows"
> -					break
> +				#
> +				# Need to decide whether we do next iteration
> +				# or stop on the existing and go do full dump
> +				# and restore
> +				#
>  
> -			if self.iteration >= phaul_iter_max:
> -				print "\t> Too many iterations"
> -				break
> +				print "Checking iteration progress:"
>  
> -			self.iteration += 1
> -			self.prev_stats = stats
> -			print "\t> Proceed to next iteration"
> +				if stats.pages_written <= phaul_iter_min_size:
> +					print "\t> Small dump"
> +					break;
>  
> -			self.fs.next_iteration()
> +				if self.prev_stats:
> +					w_add = stats.pages_written - self.prev_stats.pages_written
> +					w_add = w_add * 100 / self.prev_stats.pages_written
> +					if w_add > phaul_iter_grow_max:
> +						print "\t> Iteration grows"
> +						break
>  
> -		#
> -		# Finish with iterations -- do full dump, send images
> -		# to target host and restore from them there
> -		#
> +				if self.iteration >= phaul_iter_max:
> +					print "\t> Too many iterations"
> +					break
>  
> -		print "Final dump and restore"
> +				self.iteration += 1
> +				self.prev_stats = stats
> +				print "\t> Proceed to next iteration"
>  
> -		self.th.start_iter()
> -		self.img.new_image_dir()
> +				self.fs.next_iteration()
>  
> -		print "\tIssuing dump command to service"
> -		req = self.make_dump_req(cr_rpc.DUMP)
> -		req.opts.notify_scripts = True
> -		req.opts.file_locks = True
> -		req.opts.evasive_devices = True
> -		req.opts.link_remap = True
> -		if self.htype.can_migrate_tcp:
> -			req.opts.tcp_established = True
> +			#
> +			# Finish with iterations -- do full dump, send images
> +			# to target host and restore from them there
> +			#
>  
> -		cc.send_req(req, False)
> +			print "Final dump and restore"
>  
> -		while True:
> -			resp = cc.recv_resp()
> -			if resp.type != cr_rpc.NOTIFY:
> -				if resp.type == cr_rpc.DUMP and not resp.success:
> -					raise Exception("Dump failed")
> -				else:
> -					raise Exception("Unexpected responce from service (%d)" % resp.type)
> +			self.th.start_iter()
> +			self.img.new_image_dir()
>  
> -			if resp.notify.script == "post-dump":
> -				#
> -				# Dump is effectively over. Now CRIU
> -				# waits for us to do whatever we want
> -				# and keeps the tasks frozen.
> -				#
> -				break
> +			print "\tIssuing dump command to service"
> +			req = self.make_dump_req(cr_rpc.DUMP)
> +			req.opts.notify_scripts = True
> +			req.opts.file_locks = True
> +			req.opts.evasive_devices = True
> +			req.opts.link_remap = True
> +			if self.htype.can_migrate_tcp:
> +				req.opts.tcp_established = True
> +
> +			cc.send_req(req, False)
> +
> +			while True:
> +				resp = cc.recv_resp()
> +				if resp.type != cr_rpc.NOTIFY:
> +					if resp.type == cr_rpc.DUMP and not resp.success:
> +						raise Exception("Dump failed")
> +					else:
> +						raise Exception("Unexpected responce from service (%d)" % resp.type)
> +
> +				if resp.notify.script == "post-dump":
> +					#
> +					# Dump is effectively over. Now CRIU
> +					# waits for us to do whatever we want
> +					# and keeps the tasks frozen.
> +					#
> +					break
>  
> -			elif resp.notify.script == "network-lock":
> -				self.htype.net_lock()
> -			elif resp.notify.script == "network-unlock":
> -				self.htype.net_unlock()
> +				elif resp.notify.script == "network-lock":
> +					self.htype.net_lock()
> +				elif resp.notify.script == "network-unlock":
> +					self.htype.net_unlock()
>  
> -			print "\t\tNotify (%s)" % resp.notify.script
> -			cc.ack_notify()
> +				print "\t\tNotify (%s)" % resp.notify.script
> +				cc.ack_notify()
>  
> -		print "Dump complete"
> -		self.th.end_iter()
> +			print "Dump complete"
> +			self.th.end_iter()
>  
> -		#
> -		# Dump is complete -- go to target node,
> -		# restore them there and kill (if required)
> -		# tasks on source node
> -		#
> +			#
> +			# Dump is complete -- go to target node,
> +			# restore them there and kill (if required)
> +			# tasks on source node
> +			#
>  
> -		print "Final FS and images sync"
> -		self.fs.stop_migration()
> -		self.img.sync_imgs_to_target(self.th, self.htype)
> +			print "Final FS and images sync"
> +			self.fs.stop_migration()
> +			self.img.sync_imgs_to_target(self.th, self.htype)
>  
> -		print "Asking target host to restore"
> -		self.th.restore_from_images()
> +			print "Asking target host to restore"
> +			self.th.restore_from_images()
>  
> -		#
> -		# Ack the notify after restore -- CRIU would
> -		# then terminate all tasks and send us back
> -	 	# DUMP/success message
> -		#
> +			#
> +			# Ack the notify after restore -- CRIU would
> +			# then terminate all tasks and send us back
> +			# DUMP/success message
> +			#
>  
> -		cc.ack_notify()
> -		resp = cc.recv_resp()
> -		if resp.type != cr_rpc.DUMP:
> -			raise Exception("Dump failed")
> +			cc.ack_notify()
> +			resp = cc.recv_resp()
> +			if resp.type != cr_rpc.DUMP:
> +				raise Exception("Dump failed")
>  
>  		self.htype.umount()
>  
> diff --git a/p_haul_service.py b/p_haul_service.py
> index ae243d6..3aae1b4 100644
> --- a/p_haul_service.py
> +++ b/p_haul_service.py
> @@ -45,26 +45,26 @@ class phaul_service(rpyc.Service):
>  
>  	def start_page_server(self):
>  		print "Starting page server for iter %d" % self.dump_iter
> -		cc = cr_api.criu_conn()
> -		cc.verbose(self.verb)
> +		with cr_api.criu_conn() as cc:
> +			cc.verbose(self.verb)
>  
> -		req = cr_rpc.criu_req()
> -		req.type = cr_rpc.PAGE_SERVER
> -		req.opts.ps.port = ps_start_port + self.dump_iter # FIXME -- implement and use autobind in CRIU
> +			req = cr_rpc.criu_req()
> +			req.type = cr_rpc.PAGE_SERVER
> +			req.opts.ps.port = ps_start_port + self.dump_iter # FIXME -- implement and use autobind in CRIU
>  
> -		req.opts.images_dir_fd = self.img.image_dir_fd()
> -		req.opts.work_dir_fd = self.img.work_dir_fd()
> -		p_img = self.img.prev_image_dir()
> -		if p_img:
> -			req.opts.parent_img = p_img
> +			req.opts.images_dir_fd = self.img.image_dir_fd()
> +			req.opts.work_dir_fd = self.img.work_dir_fd()
> +			p_img = self.img.prev_image_dir()
> +			if p_img:
> +				req.opts.parent_img = p_img
>  
> -		print "\tSending criu rpc req"
> -		resp = cc.send_req(req)
> -		if (resp.type != cr_rpc.PAGE_SERVER) or (not resp.success):
> -			raise Exception("Failed to start page server")
> +			print "\tSending criu rpc req"
> +			resp = cc.send_req(req)
> +			if (resp.type != cr_rpc.PAGE_SERVER) or (not resp.success):
> +				raise Exception("Failed to start page server")
>  
> -		self.page_server_pid = resp.ps.pid
> -		print "\tPage server started at %d" % resp.ps.pid
> +			self.page_server_pid = resp.ps.pid
> +			print "\tPage server started at %d" % resp.ps.pid
>  
>  	def exposed_start_iter(self):
>  		self.dump_iter += 1
> @@ -79,61 +79,61 @@ class phaul_service(rpyc.Service):
>  
>  	def exposed_restore_from_images(self):
>  		print "Restoring from images"
> -		cc = cr_api.criu_conn()
> -		cc.verbose(self.verb)
> -
> -		self.htype.put_meta_images(self.img.image_dir())
> -
> -		req = cr_rpc.criu_req()
> -		req.type = cr_rpc.RESTORE
> -		req.opts.images_dir_fd = self.img.image_dir_fd()
> -		req.opts.work_dir_fd = self.img.work_dir_fd()
> -		req.opts.notify_scripts = True
> -
> -		if self.htype.can_migrate_tcp():
> -			req.opts.tcp_established = True
> -
> -		for veth in self.htype.veths():
> -			v = req.opts.veths.add()
> -			v.if_in = veth.name
> -			v.if_out = veth.pair
> -
> -		nroot = self.htype.mount()
> -		if nroot:
> -			req.opts.root = nroot
> -			print "Restore root set to %s" % req.opts.root
> -
> -		cc.send_req(req, False)
> -
> -		while True:
> -			resp = cc.recv_resp()
> -			if resp.type == cr_rpc.NOTIFY:
> -				print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
> -				if resp.notify.script == "setup-namespaces":
> -					#
> -					# At that point we have only one task
> -					# living in namespaces and waiting for
> -					# us to ACK the notify. Htype might want
> -					# to configure namespace (external net
> -					# devices) and cgroups
> -					#
> -					self.htype.prepare_ct(resp.notify.pid)
> -				elif resp.notify.script == "network-unlock":
> -					self.htype.net_unlock()
> -				elif resp.notify.script == "network-lock":
> -					raise Exception("Locking network on restore?")
> -
> -				cc.ack_notify()
> -				continue
> -
> -			if resp.type != cr_rpc.RESTORE:
> -				raise Exception("Unexpected responce from service (%d)" % resp.type)
> -
> -			if not resp.success:
> -				raise Exception("Restore failed")
> -
> -			print "Restore succeeded"
> -			break
> +		with cr_api.criu_conn() as cc:
> +			cc.verbose(self.verb)
> +
> +			self.htype.put_meta_images(self.img.image_dir())
> +
> +			req = cr_rpc.criu_req()
> +			req.type = cr_rpc.RESTORE
> +			req.opts.images_dir_fd = self.img.image_dir_fd()
> +			req.opts.work_dir_fd = self.img.work_dir_fd()
> +			req.opts.notify_scripts = True
> +
> +			if self.htype.can_migrate_tcp():
> +				req.opts.tcp_established = True
> +
> +			for veth in self.htype.veths():
> +				v = req.opts.veths.add()
> +				v.if_in = veth.name
> +				v.if_out = veth.pair
> +
> +			nroot = self.htype.mount()
> +			if nroot:
> +				req.opts.root = nroot
> +				print "Restore root set to %s" % req.opts.root
> +
> +			cc.send_req(req, False)
> +
> +			while True:
> +				resp = cc.recv_resp()
> +				if resp.type == cr_rpc.NOTIFY:
> +					print "\t\tNotify (%s.%d)" % (resp.notify.script, resp.notify.pid)
> +					if resp.notify.script == "setup-namespaces":
> +						#
> +						# At that point we have only one task
> +						# living in namespaces and waiting for
> +						# us to ACK the notify. Htype might want
> +						# to configure namespace (external net
> +						# devices) and cgroups
> +						#
> +						self.htype.prepare_ct(resp.notify.pid)
> +					elif resp.notify.script == "network-unlock":
> +						self.htype.net_unlock()
> +					elif resp.notify.script == "network-lock":
> +						raise Exception("Locking network on restore?")
> +
> +					cc.ack_notify()
> +					continue
> +
> +				if resp.type != cr_rpc.RESTORE:
> +					raise Exception("Unexpected responce from service (%d)" % resp.type)
> +
> +				if not resp.success:
> +					raise Exception("Restore failed")
> +
> +				print "Restore succeeded"
> +				break
>  
>  		self.htype.restored(resp.restore.pid)
>  		self.restored = True
> -- 
> 1.8.5.3
> 


More information about the CRIU mailing list