[Pkg-bitcoin-commits] [python-quamash] 53/269: Fix subprocess on Unix

Jonas Smedegaard dr at jones.dk
Fri Nov 24 11:26:15 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository python-quamash.

commit 5362ebeedbcf903c1187c2620c630385664aa3e3
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date:   Sat Jul 5 18:29:06 2014 +0200

    Fix subprocess on Unix
---
 quamash/__init__.py | 135 +++++++++++---------------------
 quamash/_common.py  |   5 ++
 quamash/_unix.py    | 222 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 quamash/_windows.py |  22 +++++-
 4 files changed, 288 insertions(+), 96 deletions(-)

diff --git a/quamash/__init__.py b/quamash/__init__.py
index 5b80814..2535723 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -36,24 +36,27 @@ class _EventWorker(QtCore.QThread):
 		self.__stop = False
 		self.__selector = selector
 		self.__sig_events = parent.sig_events
-		self.semaphore = QtCore.QSemaphore()
+		self.__semaphore = QtCore.QSemaphore()
+
+	def start(self):
+		super().start()
+		self.__semaphore.acquire()
 
 	def stop(self):
 		self.__stop = True
 		# Wait for thread to end
-		self.semaphore.acquire()
+		self.wait()
 
 	def run(self):
 		self._logger.debug('Thread started')
-		self.semaphore.release()
+		self.__semaphore.release()
 
 		while not self.__stop:
-			events = self.__selector.select(0.01)
+			events = self.__selector.select(0.1)
 			if events:
-				self._logger.debug('Got events {}'.format(events))
-				self.__sig_events.emit(list(events))
+				self._logger.debug('Got events from poll: {}'.format(events))
+				self.__sig_events.emit(events)
 
-		self.semaphore.release()
 		self._logger.debug('Exiting thread')
 
 
@@ -62,16 +65,10 @@ class _EventPoller(QtCore.QObject):
 	"""Polling of events in separate thread."""
 	sig_events = QtCore.Signal(list)
 
-	def __init__(self, selector):
-		super(_EventPoller, self).__init__()
-		self.__selector = selector
-
-	def start(self):
-		self._logger.debug('Starting...')
-		self.__worker = _EventWorker(self.__selector, self)
+	def start(self, selector):
+		self._logger.debug('Starting (selector: {})...'.format(selector))
+		self.__worker = _EventWorker(selector, self)
 		self.__worker.start()
-		# Wait for thread to start
-		self.__worker.semaphore.acquire()
 
 	def stop(self):
 		self._logger.debug('Stopping worker thread...')
@@ -92,8 +89,14 @@ class _QThreadWorker(QtCore.QThread):
 		super().__init__()
 
 	def run(self):
-		while not self.__stop:
-			future, callback, args, kwargs = self.__queue.get()
+		queue = self.__queue
+		while True:
+			command = queue.get()
+			if command is None:
+				# Stopping...
+				break
+
+			future, callback, args, kwargs = command
 			self._logger.debug('#{} got callback {} with args {} and kwargs {} from queue'
 				.format(self.__num, callback, args, kwargs))
 			if future.set_running_or_notify_cancel():
@@ -104,11 +107,11 @@ class _QThreadWorker(QtCore.QThread):
 			else:
 				self._logger.debug('Future was cancelled')
 
-		self._logger.debug('#{} stopped'.format(self.__num))
+		self._logger.debug('Thread #{} stopped'.format(self.__num))
 
-	def stop(self):
-		self._logger.debug('#{} stopping...'.format(self.__num))
-		self.__stop = True
+	def wait(self):
+		self._logger.debug('Waiting for thread #{} to stop...'.format(self.__num))
+		super().wait()
 
 
 @with_logger
@@ -143,8 +146,11 @@ class QThreadExecutor(QtCore.QObject):
 
 	def close(self):
 		self._logger.debug('Closing')
+		for i in range(len(self.__workers)):
+			# Signal workers to stop
+			self.__queue.put(None)
 		for w in self.__workers:
-			w.stop()
+			w.wait()
 
 	def __enter__(self, *args):
 		return self
@@ -193,15 +199,13 @@ def _easycallback(fn):
 if os.name == 'nt':
 	from . import _windows
 	_baseclass = _windows.baseclass
-	_selector_cls = _windows.selector_cls
 else:
 	from . import _unix
 	_baseclass = _unix.baseclass
-	_selector_cls = _unix.selector_cls
 
 
 @with_logger
-class QEventLoop(QtCore.QObject, _baseclass):
+class QEventLoop(_baseclass):
 	"""
 	Implementation of asyncio event loop that uses the Qt Event loop
 	>>> @quamash.task
@@ -221,53 +225,25 @@ class QEventLoop(QtCore.QObject, _baseclass):
 		self.__debug_enabled = False
 		self.__default_executor = None
 		self.__exception_handler = None
-		self.__signal_safe_callbacks = []
-
-		print('Initing Qt')
-		QtCore.QObject.__init__(self)
-		print('Inited')
-		_baseclass.__init__(self, _selector_cls())
-
-		self.__event_poller = _EventPoller(self._selector)
-		self.__event_poller.sig_events.connect(self.__on_events)
 
-		socket_notifier = QtCore.QSocketNotifier(self._ssock.fileno(), QtCore.QSocketNotifier.Read, self)
-		socket_notifier.activated.connect(self.__wake_on_socket)
+		_baseclass.__init__(self)
 
-	def __wake_on_socket(self):
-		self._logger.info('Waking on socket notification')
-		for i, handle in enumerate(self.__signal_safe_callbacks[:]):
-			self.__add_callback(0, handle)
-			del self.__signal_safe_callbacks[i]
-
-	def _add_callback_signalsafe(self, handle):
-		"""Add callback in signal safe manner."""
-		self._logger.info('Adding callback signal safe: {}'.format(handle))
-		self.__signal_safe_callbacks.append(handle)
-		self._write_to_self()
+		# self.__event_poller = _EventPoller()
+		# self.__event_poller.sig_events.connect(self._process_events)
 
 	def run_forever(self):
 		"""Run eventloop forever."""
-		def stop_io_event_loop():
-			self.__io_event_loop.stop()
-			self._logger.debug('IO event loop stopped')
-			semaphore.release()
-
-		# self.__start_io_event_loop()
-
 		self.__is_running = True
-		self._logger.debug('Starting Qt event loop')
+		# self._logger.debug('Starting event poller')
+		# self.__event_poller.start(self._selector)
 		try:
-			self._logger.debug('Starting event poller')
-			self.__event_poller.start()
+			self._logger.debug('Starting Qt event loop')
 			rslt = self.__app.exec_()
 			self._logger.debug('Qt event loop ended with result {}'.format(rslt))
 			return rslt
 		finally:
-			self._logger.debug('Stopping event poller')
-			self.__event_poller.stop()
-			# self._logger.debug('Stopping IO event loop...')
-			# self.__io_event_loop.call_soon_threadsafe(stop_io_event_loop)
+			# self._logger.debug('Stopping event poller')
+			# self.__event_poller.stop()
 			self.__is_running = False
 
 	def run_until_complete(self, future):
@@ -292,9 +268,9 @@ class QEventLoop(QtCore.QObject, _baseclass):
 			self._logger.debug('Already stopped')
 			return
 
-		self._logger.debug('Stopping eventloop...')
+		self._logger.debug('Stopping event loop...')
 		self.__app.exit()
-		self._logger.debug('Stopped eventloop')
+		self._logger.debug('Stopped event loop')
 
 	def is_running(self):
 		"""Is event loop running?"""
@@ -320,15 +296,16 @@ class QEventLoop(QtCore.QObject, _baseclass):
 			.format(
 				callback, args, delay
 			))
-		self.__add_callback(delay, asyncio.Handle(callback, args, self))
+		self._add_callback(asyncio.Handle(callback, args, self), delay)
 
-	def __add_callback(self, delay, handle):
+	def _add_callback(self, handle, delay=0):
 		def upon_timeout():
 			self.__timers.remove(timer)
 			self._logger.debug('Callback timer fired, calling {}'.format(
 				handle))
 			handle._run()
 
+		self._logger.debug('Adding callback {} with delay {}'.format(handle, delay))
 		timer = QtCore.QTimer(self.__app)
 		timer.timeout.connect(upon_timeout)
 		timer.setSingleShot(True)
@@ -398,7 +375,7 @@ class QEventLoop(QtCore.QObject, _baseclass):
 		context parameter has the same meaning as in
 		`call_exception_handler()`.
 		"""
-		self._logger.debug('Executing default exception handler')
+		self._logger.debug('Default exception handler executing')
 		message = context.get('message')
 		if not message:
 			message = 'Unhandled exception in event loop'
@@ -466,30 +443,6 @@ class QEventLoop(QtCore.QObject, _baseclass):
 		finally:
 			asyncio.set_event_loop(None)
 
-	def __on_events(self, events):
-		for f, callback, transferred, key, ov in events:
-			try:
-				self._logger.debug('Invoking event callback {}'.format(callback))
-				value = callback(transferred, key, ov)
-			except OSError as e:
-				self._logger.warn('Event callback failed: {}'.format(e))
-				f.set_exception(e)
-			else:
-				f.set_result(value)
-
-	def __handler_helper(self, target, *args):
-		semaphore = threading.Semaphore(0)
-		handler = None
-
-		def helper_target():
-			nonlocal handler
-			handler = target(*args)
-			semaphore.release()
-
-		# self.__io_event_loop.call_soon_threadsafe(helper_target)
-		with semaphore:
-			return handler
-
 	def __start_io_event_loop(self):
 		"""Start the I/O event loop which we defer to for performing I/O on another thread.
 		"""
diff --git a/quamash/_common.py b/quamash/_common.py
index be9d792..b3fbcb0 100644
--- a/quamash/_common.py
+++ b/quamash/_common.py
@@ -1,4 +1,9 @@
 import logging
+try:
+	from PySide import QtCore
+except ImportError:
+	from PyQt5 import QtCore
+	QtCore.Signal = QtCore.pyqtSignal
 
 
 def with_logger(cls):
diff --git a/quamash/_unix.py b/quamash/_unix.py
index 139def0..d840ba5 100644
--- a/quamash/_unix.py
+++ b/quamash/_unix.py
@@ -1,6 +1,224 @@
 import asyncio
 from asyncio import selectors
+import collections
 
-baseclass = asyncio.SelectorEventLoop
+from ._common import QtCore, with_logger
 
-selector_cls = selectors.DefaultSelector
+
+EVENT_READ = (1 << 0)
+EVENT_WRITE = (1 << 1)
+
+
+def _fileobj_to_fd(fileobj):
+	"""Return a file descriptor from a file object.
+
+	Parameters:
+	fileobj -- file object or file descriptor
+
+	Returns:
+	corresponding file descriptor
+
+	Raises:
+	ValueError if the object is invalid
+	"""
+	if isinstance(fileobj, int):
+		fd = fileobj
+	else:
+		try:
+			fd = int(fileobj.fileno())
+		except (AttributeError, TypeError, ValueError):
+			raise ValueError("Invalid file object: {!r}".format(fileobj)) from None
+	if fd < 0:
+		raise ValueError("Invalid file descriptor: {}".format(fd))
+	return fd
+
+
+class _SelectorMapping(collections.Mapping):
+	"""Mapping of file objects to selector keys."""
+	def __init__(self, selector):
+		self._selector = selector
+
+	def __len__(self):
+		return len(self._selector._fd_to_key)
+
+	def __getitem__(self, fileobj):
+		try:
+			fd = self._selector._fileobj_lookup(fileobj)
+			return self._selector._fd_to_key[fd]
+		except KeyError:
+			raise KeyError("{!r} is not registered".format(fileobj)) from None
+
+	def __iter__(self):
+		return iter(self._selector._fd_to_key)
+
+
+ at with_logger
+class _Selector(selectors.BaseSelector):
+	def __init__(self, parent):
+		# this maps file descriptors to keys
+		self._fd_to_key = {}
+		# read-only mapping returned by get_map()
+		self.__map = _SelectorMapping(self)
+		self.__read_notifiers = {}
+		self.__write_notifiers = {}
+		self.__parent = parent
+
+	def select(self, *args, **kwargs):
+		"""Implement abstract method even though we don't need it."""
+		raise NotImplemented
+
+	def _fileobj_lookup(self, fileobj):
+		"""Return a file descriptor from a file object.
+
+		This wraps _fileobj_to_fd() to do an exhaustive search in case
+		the object is invalid but we still have it in our map.  This
+		is used by unregister() so we can unregister an object that
+		was previously registered even if it is closed.  It is also
+		used by _SelectorMapping.
+		"""
+		try:
+			return _fileobj_to_fd(fileobj)
+		except ValueError:
+			# Do an exhaustive search.
+			for key in self._fd_to_key.values():
+				if key.fileobj is fileobj:
+					return key.fd
+			# Raise ValueError after all.
+			raise
+
+	def register(self, fileobj, events, data=None):
+		if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
+			raise ValueError("Invalid events: {!r}".format(events))
+
+		key = selectors.SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
+
+		if key.fd in self._fd_to_key:
+			raise KeyError("{!r} (FD {}) is already registered"
+						   .format(fileobj, key.fd))
+
+		self._fd_to_key[key.fd] = key
+
+		if events & EVENT_READ:
+			notifier = QtCore.QSocketNotifier(key.fd, QtCore.QSocketNotifier.Read)
+			notifier.activated.connect(self.__on_read_activated)
+			self.__read_notifiers[key.fd] = notifier
+		if events & EVENT_WRITE:
+			notifier = QtCore.QSocketNotifier(key.fd, QtCore.QSocketNotifier.Write)
+			notifier.activated.connect(self.__on_write_activated)
+			self.__write_notifiers[key.fd] = notifier
+
+		return key
+
+	def __on_read_activated(self, fd):
+		self._logger.debug('File {} ready to read'.format(fd))
+		key = self._key_from_fd(fd)
+		if key:
+			self.__parent._process_event(key, EVENT_READ & key.events)
+
+	def __on_write_activated(self, fd):
+		self._logger.debug('File {} ready to write'.format(fd))
+		key = self._key_from_fd(fd)
+		if key:
+			self.__parent._process_event(key, EVENT_WRITE & key.events)
+
+	def unregister(self, fileobj):
+		try:
+			key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
+		except KeyError:
+			raise KeyError("{!r} is not registered".format(fileobj)) from None
+
+		try:
+			del self.__read_notifiers[key.fd]
+		except KeyError:
+			 pass
+		try:
+			del self.__write_notifiers[key.fd]
+		except KeyError:
+			pass
+
+		return key
+
+	def modify(self, fileobj, events, data=None):
+		try:
+			key = self._fd_to_key[self._fileobj_lookup(fileobj)]
+		except KeyError:
+			raise KeyError("{!r} is not registered".format(fileobj)) from None
+		if events != key.events:
+			self.unregister(fileobj)
+			key = self.register(fileobj, events, data)
+		elif data != key.data:
+			# Use a shortcut to update the data.
+			key = key._replace(data=data)
+			self._fd_to_key[key.fd] = key
+		return key
+
+	def close(self):
+		self._logger.debug('Closing')
+		self._fd_to_key.clear()
+		self.__read_notifiers.clear()
+		self.__write_notifiers.clear()
+
+	def get_map(self):
+		return self.__map
+
+	def _key_from_fd(self, fd):
+		"""Return the key associated to a given file descriptor.
+
+		Parameters:
+		fd -- file descriptor
+
+		Returns:
+		corresponding key, or None if not found
+		"""
+		try:
+			return self._fd_to_key[fd]
+		except KeyError:
+			return None
+
+
+class _SelectorEventLoop(asyncio.SelectorEventLoop):
+	def __init__(self):
+		self._signal_safe_callbacks = []
+
+		selector = _Selector(self)
+		asyncio.SelectorEventLoop.__init__(self, selector)
+
+		socket_notifier = self.__socket_notifier = QtCore.QSocketNotifier(
+			self._ssock.fileno(), QtCore.QSocketNotifier.Read)
+		socket_notifier.activated.connect(self.__wake_on_socket)
+
+	def _process_event(self, key, mask):
+		"""Selector has delivered us an event."""
+		self._logger.debug('Processing event with key {} and mask {}'.format(key, mask))
+		fileobj, (reader, writer) = key.fileobj, key.data
+		if mask & selectors.EVENT_READ and reader is not None:
+			if reader._cancelled:
+				self.remove_reader(fileobj)
+			else:
+				self._logger.debug('Invoking reader callback: {}'.format(reader))
+				reader._run()
+		if mask & selectors.EVENT_WRITE and writer is not None:
+			if writer._cancelled:
+				self.remove_writer(fileobj)
+			else:
+				self._logger.debug('Invoking writer callback: {}'.format(writer))
+				writer._run()
+
+	def _add_callback_signalsafe(self, handle):
+		"""Add callback in signal safe manner."""
+		self._signal_safe_callbacks.append(handle)
+		self._write_to_self()
+
+	def __wake_on_socket(self):
+		self._logger.debug('Waking on socket notification, {} signal callback(s) waiting'.format(
+			len(self._signal_safe_callbacks)
+		))
+		# Acknowledge command
+		self._ssock.recv(1)
+		for handle in self._signal_safe_callbacks[:]:
+			self._logger.debug('Scheduling signal callback {}'.format(handle))
+			self._signal_safe_callbacks.remove(handle)
+			self._add_callback(handle)
+
+
+baseclass = _SelectorEventLoop
diff --git a/quamash/_windows.py b/quamash/_windows.py
index ca36be6..85860b5 100644
--- a/quamash/_windows.py
+++ b/quamash/_windows.py
@@ -5,7 +5,25 @@ from asyncio import windows_events
 from asyncio import _overlapped
 import math
 
-from ._common import with_logger
+from ._common import with_logger, QtCore
+
+
+class ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop):
+	def __init__(self):
+		QtCore.QObject.__init__(self)
+		asyncio.ProactorEventLoop.__init__(self, _IocpProactor())
+
+	def _process_events(self, events):
+		for f, callback, transferred, key, ov in events:
+			try:
+				self._logger.debug('Invoking event callback {}'.format(callback))
+				value = callback(transferred, key, ov)
+			except OSError as e:
+				self._logger.warn('Event callback failed: {}'.format(e))
+				f.set_exception(e)
+			else:
+				f.set_result(value)
+
 
 baseclass = asyncio.ProactorEventLoop
 
@@ -67,5 +85,3 @@ class _IocpProactor(windows_events.IocpProactor):
 				self.__events.append((f, callback, transferred, key, ov))
 
 			ms = 0
-
-selector_cls = _IocpProactor

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git



More information about the Pkg-bitcoin-commits mailing list