[Pkg-bitcoin-commits] [python-quamash] 53/269: Fix subprocess on Unix
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:15 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit 5362ebeedbcf903c1187c2620c630385664aa3e3
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date: Sat Jul 5 18:29:06 2014 +0200
Fix subprocess on Unix
---
quamash/__init__.py | 135 +++++++++++---------------------
quamash/_common.py | 5 ++
quamash/_unix.py | 222 +++++++++++++++++++++++++++++++++++++++++++++++++++-
quamash/_windows.py | 22 +++++-
4 files changed, 288 insertions(+), 96 deletions(-)
diff --git a/quamash/__init__.py b/quamash/__init__.py
index 5b80814..2535723 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -36,24 +36,27 @@ class _EventWorker(QtCore.QThread):
self.__stop = False
self.__selector = selector
self.__sig_events = parent.sig_events
- self.semaphore = QtCore.QSemaphore()
+ self.__semaphore = QtCore.QSemaphore()
+
+ def start(self):
+ super().start()
+ self.__semaphore.acquire()
def stop(self):
self.__stop = True
# Wait for thread to end
- self.semaphore.acquire()
+ self.wait()
def run(self):
self._logger.debug('Thread started')
- self.semaphore.release()
+ self.__semaphore.release()
while not self.__stop:
- events = self.__selector.select(0.01)
+ events = self.__selector.select(0.1)
if events:
- self._logger.debug('Got events {}'.format(events))
- self.__sig_events.emit(list(events))
+ self._logger.debug('Got events from poll: {}'.format(events))
+ self.__sig_events.emit(events)
- self.semaphore.release()
self._logger.debug('Exiting thread')
@@ -62,16 +65,10 @@ class _EventPoller(QtCore.QObject):
"""Polling of events in separate thread."""
sig_events = QtCore.Signal(list)
- def __init__(self, selector):
- super(_EventPoller, self).__init__()
- self.__selector = selector
-
- def start(self):
- self._logger.debug('Starting...')
- self.__worker = _EventWorker(self.__selector, self)
+ def start(self, selector):
+ self._logger.debug('Starting (selector: {})...'.format(selector))
+ self.__worker = _EventWorker(selector, self)
self.__worker.start()
- # Wait for thread to start
- self.__worker.semaphore.acquire()
def stop(self):
self._logger.debug('Stopping worker thread...')
@@ -92,8 +89,14 @@ class _QThreadWorker(QtCore.QThread):
super().__init__()
def run(self):
- while not self.__stop:
- future, callback, args, kwargs = self.__queue.get()
+ queue = self.__queue
+ while True:
+ command = queue.get()
+ if command is None:
+ # Stopping...
+ break
+
+ future, callback, args, kwargs = command
self._logger.debug('#{} got callback {} with args {} and kwargs {} from queue'
.format(self.__num, callback, args, kwargs))
if future.set_running_or_notify_cancel():
@@ -104,11 +107,11 @@ class _QThreadWorker(QtCore.QThread):
else:
self._logger.debug('Future was cancelled')
- self._logger.debug('#{} stopped'.format(self.__num))
+ self._logger.debug('Thread #{} stopped'.format(self.__num))
- def stop(self):
- self._logger.debug('#{} stopping...'.format(self.__num))
- self.__stop = True
+ def wait(self):
+ self._logger.debug('Waiting for thread #{} to stop...'.format(self.__num))
+ super().wait()
@with_logger
@@ -143,8 +146,11 @@ class QThreadExecutor(QtCore.QObject):
def close(self):
self._logger.debug('Closing')
+ for i in range(len(self.__workers)):
+ # Signal workers to stop
+ self.__queue.put(None)
for w in self.__workers:
- w.stop()
+ w.wait()
def __enter__(self, *args):
return self
@@ -193,15 +199,13 @@ def _easycallback(fn):
if os.name == 'nt':
from . import _windows
_baseclass = _windows.baseclass
- _selector_cls = _windows.selector_cls
else:
from . import _unix
_baseclass = _unix.baseclass
- _selector_cls = _unix.selector_cls
@with_logger
-class QEventLoop(QtCore.QObject, _baseclass):
+class QEventLoop(_baseclass):
"""
Implementation of asyncio event loop that uses the Qt Event loop
>>> @quamash.task
@@ -221,53 +225,25 @@ class QEventLoop(QtCore.QObject, _baseclass):
self.__debug_enabled = False
self.__default_executor = None
self.__exception_handler = None
- self.__signal_safe_callbacks = []
-
- print('Initing Qt')
- QtCore.QObject.__init__(self)
- print('Inited')
- _baseclass.__init__(self, _selector_cls())
-
- self.__event_poller = _EventPoller(self._selector)
- self.__event_poller.sig_events.connect(self.__on_events)
- socket_notifier = QtCore.QSocketNotifier(self._ssock.fileno(), QtCore.QSocketNotifier.Read, self)
- socket_notifier.activated.connect(self.__wake_on_socket)
+ _baseclass.__init__(self)
- def __wake_on_socket(self):
- self._logger.info('Waking on socket notification')
- for i, handle in enumerate(self.__signal_safe_callbacks[:]):
- self.__add_callback(0, handle)
- del self.__signal_safe_callbacks[i]
-
- def _add_callback_signalsafe(self, handle):
- """Add callback in signal safe manner."""
- self._logger.info('Adding callback signal safe: {}'.format(handle))
- self.__signal_safe_callbacks.append(handle)
- self._write_to_self()
+ # self.__event_poller = _EventPoller()
+ # self.__event_poller.sig_events.connect(self._process_events)
def run_forever(self):
"""Run eventloop forever."""
- def stop_io_event_loop():
- self.__io_event_loop.stop()
- self._logger.debug('IO event loop stopped')
- semaphore.release()
-
- # self.__start_io_event_loop()
-
self.__is_running = True
- self._logger.debug('Starting Qt event loop')
+ # self._logger.debug('Starting event poller')
+ # self.__event_poller.start(self._selector)
try:
- self._logger.debug('Starting event poller')
- self.__event_poller.start()
+ self._logger.debug('Starting Qt event loop')
rslt = self.__app.exec_()
self._logger.debug('Qt event loop ended with result {}'.format(rslt))
return rslt
finally:
- self._logger.debug('Stopping event poller')
- self.__event_poller.stop()
- # self._logger.debug('Stopping IO event loop...')
- # self.__io_event_loop.call_soon_threadsafe(stop_io_event_loop)
+ # self._logger.debug('Stopping event poller')
+ # self.__event_poller.stop()
self.__is_running = False
def run_until_complete(self, future):
@@ -292,9 +268,9 @@ class QEventLoop(QtCore.QObject, _baseclass):
self._logger.debug('Already stopped')
return
- self._logger.debug('Stopping eventloop...')
+ self._logger.debug('Stopping event loop...')
self.__app.exit()
- self._logger.debug('Stopped eventloop')
+ self._logger.debug('Stopped event loop')
def is_running(self):
"""Is event loop running?"""
@@ -320,15 +296,16 @@ class QEventLoop(QtCore.QObject, _baseclass):
.format(
callback, args, delay
))
- self.__add_callback(delay, asyncio.Handle(callback, args, self))
+ self._add_callback(asyncio.Handle(callback, args, self), delay)
- def __add_callback(self, delay, handle):
+ def _add_callback(self, handle, delay=0):
def upon_timeout():
self.__timers.remove(timer)
self._logger.debug('Callback timer fired, calling {}'.format(
handle))
handle._run()
+ self._logger.debug('Adding callback {} with delay {}'.format(handle, delay))
timer = QtCore.QTimer(self.__app)
timer.timeout.connect(upon_timeout)
timer.setSingleShot(True)
@@ -398,7 +375,7 @@ class QEventLoop(QtCore.QObject, _baseclass):
context parameter has the same meaning as in
`call_exception_handler()`.
"""
- self._logger.debug('Executing default exception handler')
+ self._logger.debug('Default exception handler executing')
message = context.get('message')
if not message:
message = 'Unhandled exception in event loop'
@@ -466,30 +443,6 @@ class QEventLoop(QtCore.QObject, _baseclass):
finally:
asyncio.set_event_loop(None)
- def __on_events(self, events):
- for f, callback, transferred, key, ov in events:
- try:
- self._logger.debug('Invoking event callback {}'.format(callback))
- value = callback(transferred, key, ov)
- except OSError as e:
- self._logger.warn('Event callback failed: {}'.format(e))
- f.set_exception(e)
- else:
- f.set_result(value)
-
- def __handler_helper(self, target, *args):
- semaphore = threading.Semaphore(0)
- handler = None
-
- def helper_target():
- nonlocal handler
- handler = target(*args)
- semaphore.release()
-
- # self.__io_event_loop.call_soon_threadsafe(helper_target)
- with semaphore:
- return handler
-
def __start_io_event_loop(self):
"""Start the I/O event loop which we defer to for performing I/O on another thread.
"""
diff --git a/quamash/_common.py b/quamash/_common.py
index be9d792..b3fbcb0 100644
--- a/quamash/_common.py
+++ b/quamash/_common.py
@@ -1,4 +1,9 @@
import logging
+try:
+ from PySide import QtCore
+except ImportError:
+ from PyQt5 import QtCore
+ QtCore.Signal = QtCore.pyqtSignal
def with_logger(cls):
diff --git a/quamash/_unix.py b/quamash/_unix.py
index 139def0..d840ba5 100644
--- a/quamash/_unix.py
+++ b/quamash/_unix.py
@@ -1,6 +1,224 @@
import asyncio
from asyncio import selectors
+import collections
-baseclass = asyncio.SelectorEventLoop
+from ._common import QtCore, with_logger
-selector_cls = selectors.DefaultSelector
+
+EVENT_READ = (1 << 0)
+EVENT_WRITE = (1 << 1)
+
+
+def _fileobj_to_fd(fileobj):
+ """Return a file descriptor from a file object.
+
+ Parameters:
+ fileobj -- file object or file descriptor
+
+ Returns:
+ corresponding file descriptor
+
+ Raises:
+ ValueError if the object is invalid
+ """
+ if isinstance(fileobj, int):
+ fd = fileobj
+ else:
+ try:
+ fd = int(fileobj.fileno())
+ except (AttributeError, TypeError, ValueError):
+ raise ValueError("Invalid file object: {!r}".format(fileobj)) from None
+ if fd < 0:
+ raise ValueError("Invalid file descriptor: {}".format(fd))
+ return fd
+
+
+class _SelectorMapping(collections.Mapping):
+ """Mapping of file objects to selector keys."""
+ def __init__(self, selector):
+ self._selector = selector
+
+ def __len__(self):
+ return len(self._selector._fd_to_key)
+
+ def __getitem__(self, fileobj):
+ try:
+ fd = self._selector._fileobj_lookup(fileobj)
+ return self._selector._fd_to_key[fd]
+ except KeyError:
+ raise KeyError("{!r} is not registered".format(fileobj)) from None
+
+ def __iter__(self):
+ return iter(self._selector._fd_to_key)
+
+
+ at with_logger
+class _Selector(selectors.BaseSelector):
+ def __init__(self, parent):
+ # this maps file descriptors to keys
+ self._fd_to_key = {}
+ # read-only mapping returned by get_map()
+ self.__map = _SelectorMapping(self)
+ self.__read_notifiers = {}
+ self.__write_notifiers = {}
+ self.__parent = parent
+
+ def select(self, *args, **kwargs):
+ """Implement abstract method even though we don't need it."""
+ raise NotImplemented
+
+ def _fileobj_lookup(self, fileobj):
+ """Return a file descriptor from a file object.
+
+ This wraps _fileobj_to_fd() to do an exhaustive search in case
+ the object is invalid but we still have it in our map. This
+ is used by unregister() so we can unregister an object that
+ was previously registered even if it is closed. It is also
+ used by _SelectorMapping.
+ """
+ try:
+ return _fileobj_to_fd(fileobj)
+ except ValueError:
+ # Do an exhaustive search.
+ for key in self._fd_to_key.values():
+ if key.fileobj is fileobj:
+ return key.fd
+ # Raise ValueError after all.
+ raise
+
+ def register(self, fileobj, events, data=None):
+ if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
+ raise ValueError("Invalid events: {!r}".format(events))
+
+ key = selectors.SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
+
+ if key.fd in self._fd_to_key:
+ raise KeyError("{!r} (FD {}) is already registered"
+ .format(fileobj, key.fd))
+
+ self._fd_to_key[key.fd] = key
+
+ if events & EVENT_READ:
+ notifier = QtCore.QSocketNotifier(key.fd, QtCore.QSocketNotifier.Read)
+ notifier.activated.connect(self.__on_read_activated)
+ self.__read_notifiers[key.fd] = notifier
+ if events & EVENT_WRITE:
+ notifier = QtCore.QSocketNotifier(key.fd, QtCore.QSocketNotifier.Write)
+ notifier.activated.connect(self.__on_write_activated)
+ self.__write_notifiers[key.fd] = notifier
+
+ return key
+
+ def __on_read_activated(self, fd):
+ self._logger.debug('File {} ready to read'.format(fd))
+ key = self._key_from_fd(fd)
+ if key:
+ self.__parent._process_event(key, EVENT_READ & key.events)
+
+ def __on_write_activated(self, fd):
+ self._logger.debug('File {} ready to write'.format(fd))
+ key = self._key_from_fd(fd)
+ if key:
+ self.__parent._process_event(key, EVENT_WRITE & key.events)
+
+ def unregister(self, fileobj):
+ try:
+ key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
+ except KeyError:
+ raise KeyError("{!r} is not registered".format(fileobj)) from None
+
+ try:
+ del self.__read_notifiers[key.fd]
+ except KeyError:
+ pass
+ try:
+ del self.__write_notifiers[key.fd]
+ except KeyError:
+ pass
+
+ return key
+
+ def modify(self, fileobj, events, data=None):
+ try:
+ key = self._fd_to_key[self._fileobj_lookup(fileobj)]
+ except KeyError:
+ raise KeyError("{!r} is not registered".format(fileobj)) from None
+ if events != key.events:
+ self.unregister(fileobj)
+ key = self.register(fileobj, events, data)
+ elif data != key.data:
+ # Use a shortcut to update the data.
+ key = key._replace(data=data)
+ self._fd_to_key[key.fd] = key
+ return key
+
+ def close(self):
+ self._logger.debug('Closing')
+ self._fd_to_key.clear()
+ self.__read_notifiers.clear()
+ self.__write_notifiers.clear()
+
+ def get_map(self):
+ return self.__map
+
+ def _key_from_fd(self, fd):
+ """Return the key associated to a given file descriptor.
+
+ Parameters:
+ fd -- file descriptor
+
+ Returns:
+ corresponding key, or None if not found
+ """
+ try:
+ return self._fd_to_key[fd]
+ except KeyError:
+ return None
+
+
+class _SelectorEventLoop(asyncio.SelectorEventLoop):
+ def __init__(self):
+ self._signal_safe_callbacks = []
+
+ selector = _Selector(self)
+ asyncio.SelectorEventLoop.__init__(self, selector)
+
+ socket_notifier = self.__socket_notifier = QtCore.QSocketNotifier(
+ self._ssock.fileno(), QtCore.QSocketNotifier.Read)
+ socket_notifier.activated.connect(self.__wake_on_socket)
+
+ def _process_event(self, key, mask):
+ """Selector has delivered us an event."""
+ self._logger.debug('Processing event with key {} and mask {}'.format(key, mask))
+ fileobj, (reader, writer) = key.fileobj, key.data
+ if mask & selectors.EVENT_READ and reader is not None:
+ if reader._cancelled:
+ self.remove_reader(fileobj)
+ else:
+ self._logger.debug('Invoking reader callback: {}'.format(reader))
+ reader._run()
+ if mask & selectors.EVENT_WRITE and writer is not None:
+ if writer._cancelled:
+ self.remove_writer(fileobj)
+ else:
+ self._logger.debug('Invoking writer callback: {}'.format(writer))
+ writer._run()
+
+ def _add_callback_signalsafe(self, handle):
+ """Add callback in signal safe manner."""
+ self._signal_safe_callbacks.append(handle)
+ self._write_to_self()
+
+ def __wake_on_socket(self):
+ self._logger.debug('Waking on socket notification, {} signal callback(s) waiting'.format(
+ len(self._signal_safe_callbacks)
+ ))
+ # Acknowledge command
+ self._ssock.recv(1)
+ for handle in self._signal_safe_callbacks[:]:
+ self._logger.debug('Scheduling signal callback {}'.format(handle))
+ self._signal_safe_callbacks.remove(handle)
+ self._add_callback(handle)
+
+
+baseclass = _SelectorEventLoop
diff --git a/quamash/_windows.py b/quamash/_windows.py
index ca36be6..85860b5 100644
--- a/quamash/_windows.py
+++ b/quamash/_windows.py
@@ -5,7 +5,25 @@ from asyncio import windows_events
from asyncio import _overlapped
import math
-from ._common import with_logger
+from ._common import with_logger, QtCore
+
+
+class ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop):
+ def __init__(self):
+ QtCore.QObject.__init__(self)
+ asyncio.ProactorEventLoop.__init__(self, _IocpProactor())
+
+ def _process_events(self, events):
+ for f, callback, transferred, key, ov in events:
+ try:
+ self._logger.debug('Invoking event callback {}'.format(callback))
+ value = callback(transferred, key, ov)
+ except OSError as e:
+ self._logger.warn('Event callback failed: {}'.format(e))
+ f.set_exception(e)
+ else:
+ f.set_result(value)
+
baseclass = asyncio.ProactorEventLoop
@@ -67,5 +85,3 @@ class _IocpProactor(windows_events.IocpProactor):
self.__events.append((f, callback, transferred, key, ov))
ms = 0
-
-selector_cls = _IocpProactor
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list