[Pkg-bitcoin-commits] [python-quamash] 25/269: Implement subprocess_exec

Jonas Smedegaard dr at jones.dk
Fri Nov 24 11:26:12 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository python-quamash.

commit 08fc7ce1f12753651db82533b7c7ce0b5b4c52d0
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date:   Tue Jul 1 15:10:38 2014 +0200

    Implement subprocess_exec
---
 README                   |  11 +++-
 quamash/__init__.py      | 162 ++++++++++++++++++++++++++++++++++++++++++-----
 tests/test_qeventloop.py |  22 +++++++
 3 files changed, 178 insertions(+), 17 deletions(-)

diff --git a/README b/README
index d765e75..bd2fef8 100644
--- a/README
+++ b/README
@@ -5,6 +5,10 @@ Implementation of the `PEP 3156`_ Event-Loop with Qt
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 :author: Mark Harviston <mark.harviston at gmail.com>, Arve Knudsen <arve.knudsen at gmail.com>
 
+Requirements
+============
+Quamash requires Python 3.4 and either PyQt 5 or PySide.
+
 Usage
 =====
 
@@ -46,9 +50,14 @@ Usage
         with loop:
             loop.run_until_complete(my_task())
 
+Testing
+=======
+Quamash is tested with pytest; in order to run the test suite, just execute py.test on the
+commandline. The tests themselves are beneath the 'tests' directory.
 
 Name
 ====
-Tulip related projects are being named after other flowers, Quamash is one of the few flowers that starts with a "Q".
+Tulip related projects are being named after other flowers, Quamash is one of the few flowers that
+starts with a "Q".
 
 .. _`PEP 3156`: http://legacy.python.org/dev/peps/pep-3156/
diff --git a/quamash/__init__.py b/quamash/__init__.py
index d89e5bc..d0b4a02 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -13,6 +13,7 @@ __license__ = 'BSD 2 Clause License'
 import sys
 import os
 import asyncio
+from asyncio import windows_events
 import time
 from functools import wraps
 import logging
@@ -41,6 +42,101 @@ def _with_logger(cls):
 
 
 @_with_logger
+class _IocpProactor(windows_events.IocpProactor):
+    def __init__(self):
+        self.__events = []
+        super(_IocpProactor, self).__init__()
+
+    def select(self, timeout=None):
+        """Override in order to handle events in a threadsafe manner."""
+        if not self.__events:
+            self._poll(timeout)
+        tmp = self.__events
+        self.__events = []
+        return tmp
+
+    def close(self):
+        self._logger.debug('Closing')
+        super(_IocpProactor, self).close()
+
+    def _poll(self, timeout=None):
+        """Override in order to handle events in a threadsafe manner."""
+        import math
+        from asyncio import _overlapped
+        INFINITE = 0xffffffff
+
+        if timeout is None:
+            ms = INFINITE
+        elif timeout < 0:
+            raise ValueError("negative timeout")
+        else:
+            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
+            # round away from zero to wait *at least* timeout seconds.
+            ms = math.ceil(timeout * 1e3)
+            if ms >= INFINITE:
+                raise ValueError("timeout too big")
+
+        while True:
+            self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
+                ms, threading.get_ident()))
+            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
+
+            if status is None:
+                break
+            err, transferred, key, address = status
+            try:
+                f, ov, obj, callback = self._cache.pop(address)
+            except KeyError:
+                # key is either zero, or it is used to return a pipe
+                # handle which should be closed to avoid a leak.
+                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
+                    _winapi.CloseHandle(key)
+                ms = 0
+                continue
+
+            if obj in self._stopped_serving:
+                f.cancel()
+            elif not f.cancelled():
+                self.__events.append((f, callback, transferred, key, ov))
+
+            ms = 0
+
+
+ at _with_logger
+class _EventPoller(QtCore.QObject):
+    """Polling of events in separate thread."""
+    sig_events = QtCore.Signal(list)
+    
+    def __init__(self, selector):
+        super(_EventPoller, self).__init__()
+        self.__semaphore = threading.Semaphore(0)
+        self.__selector = selector
+
+    def start(self):
+        self.__canceled = False
+        self._logger.debug('Starting')
+        threading.Thread(target=self.__run).start()
+        # Wait for thread to start
+        self.__semaphore.acquire()
+
+    def stop(self):
+        self._logger.debug('Stopping')
+        self.__canceled = True
+        # Wait for thread to end
+        self.__semaphore.acquire()
+
+    def __run(self):
+        self.__semaphore.release()
+
+        while not self.__canceled:
+            events = self.__selector.select(0.1)
+            if events:
+                self.sig_events.emit(events)
+
+        self.__semaphore.release()
+
+
+ at _with_logger
 class _QThreadWorker(QtCore.QThread):
     """
     Read from the queue.
@@ -152,7 +248,11 @@ def _easycallback(fn):
     return in_wrapper
 
 
-_baseclass = asyncio.ProactorEventLoop if os.name == 'nt' else asyncio.SelectorEventLoop
+if os.name == 'nt':
+    _baseclass = asyncio.ProactorEventLoop
+    import _winapi
+else:
+    _baseclass = asyncio.SelectorEventLoop
 
 
 @_with_logger
@@ -176,22 +276,39 @@ class QEventLoop(QtCore.QObject, _baseclass):
         self.__debug_enabled = False
         self.__default_executor = None
         self.__exception_handler = None
-        
-        super().__init__()
 
-        self.__start_io_event_loop()
+        super(QEventLoop, self).__init__()
+        baseclass_args = (_IocpProactor(),) if os.name == 'nt' else ()
+        _baseclass.__init__(self, *baseclass_args)
+
+        self.__event_poller = _EventPoller(self._selector)
+        self.__event_poller.sig_events.connect(self.__on_events)
 
     def run_forever(self):
         """Run eventloop forever."""
+        def stop_io_event_loop():
+            self.__io_event_loop.stop()
+            self._logger.debug('IO event loop stopped')
+            semaphore.release()
+
+        self.__start_io_event_loop()
+
+        semaphore = threading.Semaphore(0)
         self.__is_running = True
         self._logger.debug('Starting Qt event loop')
         try:
+            self._logger.debug('Starting event poller')
+            self.__event_poller.start()
             rslt = self.__app.exec_()
             self._logger.debug('Qt event loop ended with result {}'.format(rslt))
             return rslt
         finally:
-            self.__io_event_loop.call_soon_threadsafe(self.__io_event_loop.stop)
-            self.__is_running = False
+            self._logger.debug('Stopping event poller')
+            self.__event_poller.stop()
+            self._logger.debug('Stopping IO event loop...')
+            self.__io_event_loop.call_soon_threadsafe(stop_io_event_loop)
+            with semaphore:
+                self.__is_running = False
 
     def run_until_complete(self, future):
         """Run until Future is complete."""
@@ -239,7 +356,8 @@ class QEventLoop(QtCore.QObject, _baseclass):
             raise TypeError('callback must be callable: {}'.format(type(callback).__name__))
 
         self._logger.debug(
-            'Registering callback {} to be invoked with arguments {} after {} second(s)'.format(
+            'Registering callback {} to be invoked with arguments {} after {} second(s)'
+            .format(
                 callback, args, delay
             ))
 
@@ -380,6 +498,7 @@ class QEventLoop(QtCore.QObject, _baseclass):
         return self.__debug_enabled
 
     def set_debug(self, enabled):
+        super(QEventLoop, self).set_debug(enabled)
         self.__debug_enabled = enabled
 
     def __enter__(self):
@@ -393,27 +512,38 @@ class QEventLoop(QtCore.QObject, _baseclass):
         finally:
             asyncio.set_event_loop(None)
 
+    def __on_events(self, events):
+        for f, callback, transferred, key, ov in events:
+            try:
+                self._logger.debug('Invoking event callback {}'.format(callback))
+                value = callback(transferred, key, ov)
+            except OSError as e:
+                self._logger.warn('Event callback failed: {}'.format(e))
+                f.set_exception(e)
+            else:
+                f.set_result(value)
+
     def __handler_helper(self, target, *args):
-        lock = threading.Lock()
-        lock.acquire()
+        semaphore = threading.Semaphore(0)
         handler = None
 
         def helper_target():
             nonlocal handler
             handler = target(*args)
-            lock.release()
+            semaphore.release()
 
         self.__io_event_loop.call_soon_threadsafe(helper_target)
-        lock.acquire()
-        return handler
+        with semaphore:
+            return handler
 
     def __start_io_event_loop(self):
         """Start the I/O event loop which we defer to for performing I/O on another thread.
         """
-        self.__event_loop_started = threading.Lock()
-        self.__event_loop_started.acquire()
-        threading.Thread(None, self.__io_event_loop_thread).start()
-        self.__event_loop_started.acquire()
+        self._logger.debug('Starting IO event loop...')
+        self.__event_loop_started = threading.Semaphore(0)
+        threading.Thread(target=self.__io_event_loop_thread).start()
+        with self.__event_loop_started:
+            self._logger.debug('IO event loop started')
 
     def __io_event_loop_thread(self):
         """Worker thread for running the I/O event loop."""
diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py
index d9ff424..4259314 100644
--- a/tests/test_qeventloop.py
+++ b/tests/test_qeventloop.py
@@ -2,6 +2,7 @@ import asyncio
 import os.path
 import logging
 import sys
+import locale
 try:
     from PyQt5.QtWidgets import QApplication
 except ImportError:
@@ -16,6 +17,20 @@ logging.basicConfig(level=logging.DEBUG,
                     format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
 
 
+class _SubprocessProtocol(asyncio.SubprocessProtocol):
+    def __init__(self, *args, **kwds):
+        super(_SubprocessProtocol, self).__init__(*args, **kwds)
+        self.received_stdout = None
+
+    def pipe_data_received(self, fd, data):
+        text = data.decode(locale.getpreferredencoding(False))
+        if fd == 1:
+            self.received_stdout = text.strip()
+
+    def process_exited(self):
+        asyncio.get_event_loop().stop()
+
+
 @pytest.fixture
 def loop(request):
     app = QApplication([])
@@ -48,6 +63,13 @@ class TestQEventLoop:
 
         assert was_invoked
 
+    def test_can_execute_subprocess(self, loop):
+        transport, protocol = loop.run_until_complete(loop.subprocess_exec(
+            _SubprocessProtocol, 'python', '-c', 'print(\'Hello async world!\')'))
+        loop.run_forever()
+        assert transport.get_returncode() == 0
+        assert protocol.received_stdout == 'Hello async world!'
+
     def test_can_function_as_context_manager(self):
         app = QApplication([])
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git



More information about the Pkg-bitcoin-commits mailing list