[Pkg-bitcoin-commits] [python-quamash] 25/269: Implement subprocess_exec
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:12 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit 08fc7ce1f12753651db82533b7c7ce0b5b4c52d0
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date: Tue Jul 1 15:10:38 2014 +0200
Implement subprocess_exec
---
README | 11 +++-
quamash/__init__.py | 162 ++++++++++++++++++++++++++++++++++++++++++-----
tests/test_qeventloop.py | 22 +++++++
3 files changed, 178 insertions(+), 17 deletions(-)
diff --git a/README b/README
index d765e75..bd2fef8 100644
--- a/README
+++ b/README
@@ -5,6 +5,10 @@ Implementation of the `PEP 3156`_ Event-Loop with Qt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:author: Mark Harviston <mark.harviston at gmail.com>, Arve Knudsen <arve.knudsen at gmail.com>
+Requirements
+============
+Quamash requires Python 3.4 and either PyQt 5 or PySide.
+
Usage
=====
@@ -46,9 +50,14 @@ Usage
with loop:
loop.run_until_complete(my_task())
+Testing
+=======
+Quamash is tested with pytest; in order to run the test suite, just execute py.test on the
+commandline. The tests themselves are beneath the 'tests' directory.
Name
====
-Tulip related projects are being named after other flowers, Quamash is one of the few flowers that starts with a "Q".
+Tulip related projects are being named after other flowers, Quamash is one of the few flowers that
+starts with a "Q".
.. _`PEP 3156`: http://legacy.python.org/dev/peps/pep-3156/
diff --git a/quamash/__init__.py b/quamash/__init__.py
index d89e5bc..d0b4a02 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -13,6 +13,7 @@ __license__ = 'BSD 2 Clause License'
import sys
import os
import asyncio
+from asyncio import windows_events
import time
from functools import wraps
import logging
@@ -41,6 +42,101 @@ def _with_logger(cls):
@_with_logger
+class _IocpProactor(windows_events.IocpProactor):
+ def __init__(self):
+ self.__events = []
+ super(_IocpProactor, self).__init__()
+
+ def select(self, timeout=None):
+ """Override in order to handle events in a threadsafe manner."""
+ if not self.__events:
+ self._poll(timeout)
+ tmp = self.__events
+ self.__events = []
+ return tmp
+
+ def close(self):
+ self._logger.debug('Closing')
+ super(_IocpProactor, self).close()
+
+ def _poll(self, timeout=None):
+ """Override in order to handle events in a threadsafe manner."""
+ import math
+ from asyncio import _overlapped
+ INFINITE = 0xffffffff
+
+ if timeout is None:
+ ms = INFINITE
+ elif timeout < 0:
+ raise ValueError("negative timeout")
+ else:
+ # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
+ # round away from zero to wait *at least* timeout seconds.
+ ms = math.ceil(timeout * 1e3)
+ if ms >= INFINITE:
+ raise ValueError("timeout too big")
+
+ while True:
+ self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
+ ms, threading.get_ident()))
+ status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
+
+ if status is None:
+ break
+ err, transferred, key, address = status
+ try:
+ f, ov, obj, callback = self._cache.pop(address)
+ except KeyError:
+ # key is either zero, or it is used to return a pipe
+ # handle which should be closed to avoid a leak.
+ if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
+ _winapi.CloseHandle(key)
+ ms = 0
+ continue
+
+ if obj in self._stopped_serving:
+ f.cancel()
+ elif not f.cancelled():
+ self.__events.append((f, callback, transferred, key, ov))
+
+ ms = 0
+
+
+ at _with_logger
+class _EventPoller(QtCore.QObject):
+ """Polling of events in separate thread."""
+ sig_events = QtCore.Signal(list)
+
+ def __init__(self, selector):
+ super(_EventPoller, self).__init__()
+ self.__semaphore = threading.Semaphore(0)
+ self.__selector = selector
+
+ def start(self):
+ self.__canceled = False
+ self._logger.debug('Starting')
+ threading.Thread(target=self.__run).start()
+ # Wait for thread to start
+ self.__semaphore.acquire()
+
+ def stop(self):
+ self._logger.debug('Stopping')
+ self.__canceled = True
+ # Wait for thread to end
+ self.__semaphore.acquire()
+
+ def __run(self):
+ self.__semaphore.release()
+
+ while not self.__canceled:
+ events = self.__selector.select(0.1)
+ if events:
+ self.sig_events.emit(events)
+
+ self.__semaphore.release()
+
+
+ at _with_logger
class _QThreadWorker(QtCore.QThread):
"""
Read from the queue.
@@ -152,7 +248,11 @@ def _easycallback(fn):
return in_wrapper
-_baseclass = asyncio.ProactorEventLoop if os.name == 'nt' else asyncio.SelectorEventLoop
+if os.name == 'nt':
+ _baseclass = asyncio.ProactorEventLoop
+ import _winapi
+else:
+ _baseclass = asyncio.SelectorEventLoop
@_with_logger
@@ -176,22 +276,39 @@ class QEventLoop(QtCore.QObject, _baseclass):
self.__debug_enabled = False
self.__default_executor = None
self.__exception_handler = None
-
- super().__init__()
- self.__start_io_event_loop()
+ super(QEventLoop, self).__init__()
+ baseclass_args = (_IocpProactor(),) if os.name == 'nt' else ()
+ _baseclass.__init__(self, *baseclass_args)
+
+ self.__event_poller = _EventPoller(self._selector)
+ self.__event_poller.sig_events.connect(self.__on_events)
def run_forever(self):
"""Run eventloop forever."""
+ def stop_io_event_loop():
+ self.__io_event_loop.stop()
+ self._logger.debug('IO event loop stopped')
+ semaphore.release()
+
+ self.__start_io_event_loop()
+
+ semaphore = threading.Semaphore(0)
self.__is_running = True
self._logger.debug('Starting Qt event loop')
try:
+ self._logger.debug('Starting event poller')
+ self.__event_poller.start()
rslt = self.__app.exec_()
self._logger.debug('Qt event loop ended with result {}'.format(rslt))
return rslt
finally:
- self.__io_event_loop.call_soon_threadsafe(self.__io_event_loop.stop)
- self.__is_running = False
+ self._logger.debug('Stopping event poller')
+ self.__event_poller.stop()
+ self._logger.debug('Stopping IO event loop...')
+ self.__io_event_loop.call_soon_threadsafe(stop_io_event_loop)
+ with semaphore:
+ self.__is_running = False
def run_until_complete(self, future):
"""Run until Future is complete."""
@@ -239,7 +356,8 @@ class QEventLoop(QtCore.QObject, _baseclass):
raise TypeError('callback must be callable: {}'.format(type(callback).__name__))
self._logger.debug(
- 'Registering callback {} to be invoked with arguments {} after {} second(s)'.format(
+ 'Registering callback {} to be invoked with arguments {} after {} second(s)'
+ .format(
callback, args, delay
))
@@ -380,6 +498,7 @@ class QEventLoop(QtCore.QObject, _baseclass):
return self.__debug_enabled
def set_debug(self, enabled):
+ super(QEventLoop, self).set_debug(enabled)
self.__debug_enabled = enabled
def __enter__(self):
@@ -393,27 +512,38 @@ class QEventLoop(QtCore.QObject, _baseclass):
finally:
asyncio.set_event_loop(None)
+ def __on_events(self, events):
+ for f, callback, transferred, key, ov in events:
+ try:
+ self._logger.debug('Invoking event callback {}'.format(callback))
+ value = callback(transferred, key, ov)
+ except OSError as e:
+ self._logger.warn('Event callback failed: {}'.format(e))
+ f.set_exception(e)
+ else:
+ f.set_result(value)
+
def __handler_helper(self, target, *args):
- lock = threading.Lock()
- lock.acquire()
+ semaphore = threading.Semaphore(0)
handler = None
def helper_target():
nonlocal handler
handler = target(*args)
- lock.release()
+ semaphore.release()
self.__io_event_loop.call_soon_threadsafe(helper_target)
- lock.acquire()
- return handler
+ with semaphore:
+ return handler
def __start_io_event_loop(self):
"""Start the I/O event loop which we defer to for performing I/O on another thread.
"""
- self.__event_loop_started = threading.Lock()
- self.__event_loop_started.acquire()
- threading.Thread(None, self.__io_event_loop_thread).start()
- self.__event_loop_started.acquire()
+ self._logger.debug('Starting IO event loop...')
+ self.__event_loop_started = threading.Semaphore(0)
+ threading.Thread(target=self.__io_event_loop_thread).start()
+ with self.__event_loop_started:
+ self._logger.debug('IO event loop started')
def __io_event_loop_thread(self):
"""Worker thread for running the I/O event loop."""
diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py
index d9ff424..4259314 100644
--- a/tests/test_qeventloop.py
+++ b/tests/test_qeventloop.py
@@ -2,6 +2,7 @@ import asyncio
import os.path
import logging
import sys
+import locale
try:
from PyQt5.QtWidgets import QApplication
except ImportError:
@@ -16,6 +17,20 @@ logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
+class _SubprocessProtocol(asyncio.SubprocessProtocol):
+ def __init__(self, *args, **kwds):
+ super(_SubprocessProtocol, self).__init__(*args, **kwds)
+ self.received_stdout = None
+
+ def pipe_data_received(self, fd, data):
+ text = data.decode(locale.getpreferredencoding(False))
+ if fd == 1:
+ self.received_stdout = text.strip()
+
+ def process_exited(self):
+ asyncio.get_event_loop().stop()
+
+
@pytest.fixture
def loop(request):
app = QApplication([])
@@ -48,6 +63,13 @@ class TestQEventLoop:
assert was_invoked
+ def test_can_execute_subprocess(self, loop):
+ transport, protocol = loop.run_until_complete(loop.subprocess_exec(
+ _SubprocessProtocol, 'python', '-c', 'print(\'Hello async world!\')'))
+ loop.run_forever()
+ assert transport.get_returncode() == 0
+ assert protocol.received_stdout == 'Hello async world!'
+
def test_can_function_as_context_manager(self):
app = QApplication([])
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list