[Pkg-bitcoin-commits] [python-quamash] 14/269: Fix event loop close issues

Jonas Smedegaard dr at jones.dk
Fri Nov 24 11:26:11 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository python-quamash.

commit 1ba690d077d02ba8e3bf55b324d47ff2bb500a6b
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date:   Mon Jun 30 09:56:28 2014 +0200

    Fix event loop close issues
---
 quamash/__init__.py | 233 ++++++++++++----------------------------------------
 1 file changed, 53 insertions(+), 180 deletions(-)

diff --git a/quamash/__init__.py b/quamash/__init__.py
index 8024143..68f598a 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -39,19 +39,19 @@ class _QThreadWorker(QtCore.QThread):
     For use by the QThreadExecutor
     """
     def __init__(self, queue):
-        self.queue = queue
-        self.STOP = False
+        self.__queue = queue
+        self.__stop = False
         super().__init__()
 
     def run(self):
-        while not self.STOP:
-            future, fn, args, kwargs = self.queue.get()
+        while not self.__stop:
+            future, fn, args, kwargs = self.__queue.get()
             if future.set_running_or_notify_cancel():
                 r = fn(*args, **kwargs)
                 future.set_result(r)
 
     def stop(self):
-        self.STOP = True
+        self.__stop = True
 
 
 class QThreadExecutor(QtCore.QObject):
@@ -116,9 +116,6 @@ def _easycallback(fn):
     >>>     loop.call_soon(mytask)
     >>>     loop.run_forever()
     """
-    def out_wrapper(self, args, kwargs):
-        return fn(self, *args, **kwargs)
-
     @wraps(fn)
     def in_wrapper(self, *args, **kwargs):
         return signaler.signal.emit(self, args, kwargs)
@@ -127,7 +124,7 @@ def _easycallback(fn):
         signal = QtCore.pyqtSignal(object, tuple, dict)
 
     signaler = Signaler()
-    signaler.signal.connect(out_wrapper)
+    signaler.signal.connect(lambda self, args, kwargs: fn(self, *args, **kwargs))
     return in_wrapper
 
 
@@ -148,31 +145,38 @@ class QEventLoop(QtCore.QObject, _baseclass):
     >>>     assert y == 4
     """
     def __init__(self, app):
-        self.__start_io_event_loop()
         self.__timers = []
         self.__app = app
         self.__is_running = False
         self.__debug_enabled = False
         self.__default_executor = None
         self.__exception_handler = None
-
+        
         super().__init__()
 
+        self.__start_io_event_loop()
+
     def run_forever(self):
+        """Run eventloop forever."""
         self.__is_running = True
         _logger.debug('Starting Qt event loop')
         try:
             rslt = self.__app.exec_()
+            _logger.debug('Qt event loop ended with result {}'.format(rslt))
             return rslt
         finally:
+            self.__io_event_loop.call_soon_threadsafe(self.__io_event_loop.stop)
+            super(QEventLoop, self).stop()
             self.__is_running = False
 
     def run_until_complete(self, future):
         """Run until Future is complete."""
         future = asyncio.async(future, loop=self)
         future.add_done_callback(self.stop)
-        self.run_forever()
-        future.remove_done_callback(self.stop)
+        try:
+            self.run_forever()
+        finally:
+            future.remove_done_callback(self.stop)
         if not future.done():
             raise RuntimeError('Event loop stopped before Future completed.')
 
@@ -181,7 +185,6 @@ class QEventLoop(QtCore.QObject, _baseclass):
     def stop(self):
         """Stop event loop."""
         _logger.debug('Stopping eventloop...')
-        self.__io_event_loop.call_soon_threadsafe(self.__io_event_loop.stop)
         self.__app.exit()
         _logger.debug('Stopped eventloop')
 
@@ -191,9 +194,11 @@ class QEventLoop(QtCore.QObject, _baseclass):
 
     def close(self):
         """Close event loop."""
-        self.stop()
         self.__timers = []
         self.__app = None
+        if self.__default_executor is not None:
+            self.__default_executor.close()
+        super(QEventLoop, self).close()
 
     def call_later(self, delay, callback, *args):
         """Register callback to be invoked after a certain delay."""
@@ -204,9 +209,12 @@ class QEventLoop(QtCore.QObject, _baseclass):
             callback, delay
         ))
 
+        def upon_timeout():
+            self.__timers.remove(timer)
+            callback(*args)
+
         timer = QtCore.QTimer(self.__app)
-        timer.timeout.connect(lambda: callback(*args))
-        timer.timeout.connect(lambda: self.__timers.remove(timer))
+        timer.timeout.connect(upon_timeout)
         timer.setSingleShot(True)
         timer.start(delay * 1000)
         self.__timers.append(timer)
@@ -245,153 +253,13 @@ class QEventLoop(QtCore.QObject, _baseclass):
             executor = self.__default_executor
             if executor is None:
                 executor = self.__default_executor = QThreadExecutor()
-        return self.wrap_future(executor.submit(callback, *args))
+        return asyncio.wrap_future(executor.submit(callback, *args))
 
     def set_default_executor(self, executor):
         self.__default_executor = executor
 
-    # Network I/O methods returning Futures.
-
-    def getaddrinfo(self, host, port, *, family=0, type_=0, proto=0, flags=0):
-        return self._io_helper(
-            self.__io_event_loop.getaddrinfo, (host, port), {
-                'family': family, 'type': type_, 'proto': proto, 'flags': flags,
-            })
-
-    def getnameinfo(self, sockaddr, flags=0):
-        return self._io_helper(self.__io_event_loop.getnameinfo, (sockaddr, flags), {})
-
-    def create_connection(self, protocol_factory, host=None, port=None, *,
-                          ssl=None, family=0, proto=0, flags=0, sock=None,
-                          local_addr=None, server_hostname=None):
-        raise NotImplementedError
-
-    def create_connection(
-            self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0,
-            sock=None, local_addr=None, server_hostname=None
-    ):
-        return self._io_helper(
-            self.__io_event_loop.create_connection,
-            (protocol_factory, host, port), {
-                'family': family, 'proto': proto, 'flags': flags, 'sock': sock
-            })
-
-    def create_server(self, protocol_factory, host=None, port=None, *,
-                      family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
-                      sock=None, backlog=100, ssl=None, reuse_address=None):
-        """A coroutine which creates a TCP server bound to host and port.
-
-        The return value is a Server object which can be used to stop
-        the service.
-
-        If host is an empty string or None all interfaces are assumed
-        and a list of multiple sockets will be returned (most likely
-        one for IPv4 and another one for IPv6).
-
-        family can be set to either AF_INET or AF_INET6 to force the
-        socket to use IPv4 or IPv6. If not set it will be determined
-        from host (defaults to AF_UNSPEC).
-
-        flags is a bitmask for getaddrinfo().
-
-        sock can optionally be specified in order to use a preexisting
-        socket object.
-
-        backlog is the maximum number of queued connections passed to
-        listen() (defaults to 100).
-
-        ssl can be set to an SSLContext to enable SSL over the
-        accepted connections.
-
-        reuse_address tells the kernel to reuse a local socket in
-        TIME_WAIT state, without waiting for its natural timeout to
-        expire. If not specified will automatically be set to True on
-        UNIX.
-        """
-        raise NotImplementedError
-
-    def create_unix_connection(self, protocol_factory, path, *,
-                               ssl=None, sock=None,
-                               server_hostname=None):
-        raise NotImplementedError
-
-    def create_unix_server(self, protocol_factory, path, *,
-                           sock=None, backlog=100, ssl=None):
-        """A coroutine which creates a UNIX Domain Socket server.
-
-        The return value is a Server object, which can be used to stop
-        the service.
-
-        path is a str, representing a file systsem path to bind the
-        server socket to.
-
-        sock can optionally be specified in order to use a preexisting
-        socket object.
-
-        backlog is the maximum number of queued connections passed to
-        listen() (defaults to 100).
-
-        ssl can be set to an SSLContext to enable SSL over the
-        accepted connections.
-        """
-        raise NotImplementedError
-
-    def create_datagram_endpoint(self, protocol_factory,
-                                 local_addr=None, remote_addr=None, *,
-                                 family=0, proto=0, flags=0):
-        raise NotImplementedError
-
-    # Ready-based callback registration methods.
-    # The add_*() methods return None.
-    # The remove_*() methods return True if something was removed,
-    # False if there was nothing to delete.
-
-    def add_reader(self, fd, callback, *args):
-        return _ready_helper(self.__io_event_loop.add_reader, fd, callback, *args)
-
-    def remove_reader(self, fd):
-        return _ready_helper(self.__io_event_loop.remove_reader, fd)
-
-    def add_writer(self, fd, callback, *args):
-        return _ready_helper(self.__io_event_loop.add_writer, fd, callback, *args)
-
-    def remove_writer(self, fd):
-        return _ready_helper(self.__io_event_loop.remove_writer, fd)
-
-    # Completion based I/O methods returning Futures.
-
-    def sock_recv(self, sock, nbytes):
-        return self._io_helper(
-            self.__io_event_loop.sock_recv,
-            (sock, nbytes),
-            {},
-        )
-
-    def sock_sendall(self, sock, data):
-        return self._io_helper(self.__io_event_loop.sock_sendall, (sock, data), {})
-
-    def sock_connect(self, sock, address):
-        return self._io_helper(self.__io_event_loop.sock_connect, (sock, address), {})
-
-    def sock_accept(self, sock):
-        return self._io_helper(self.__io_event_loop.sock_accept, (sock, ), {})
-
     # Signal handling.
 
-    def __handler_helper(self, target, *args):
-        lock = threading.Lock()
-        lock.acquire()
-        handler = None
-
-        def helper_target():
-            nonlocal handler
-            handler = target(*args)
-            lock.release()
-
-        self.__io_event_loop.call_soon_threadsafe(helper_target)
-        lock.acquire()
-        return handler
-
     def add_signal_handler(self, sig, callback, *args):
         return self.__handler_helper(self.add_signal_handler, sig, callback, *args)
 
@@ -418,16 +286,15 @@ class QEventLoop(QtCore.QObject, _baseclass):
         if not message:
             message = 'Unhandled exception in event loop'
 
-        exception = context.get('exception')
-        if exception is not None:
-            exc_info = (type(exception), exception, exception.__traceback__)
-        else:
+        try:
+            exception = context['exception']
+        except KeyError:
             exc_info = False
+        else:
+            exc_info = (type(exception), exception, exception.__traceback__)
 
         log_lines = [message]
-        for key in sorted(context):
-            if key in {'message', 'exception'}:
-                continue
+        for key in [k for k in sorted(context) if k not in {'message', 'exception'}]:
             log_lines.append('{}: {!r}'.format(key, context[key]))
 
         self.__log_error('\n'.join(log_lines), exc_info=exc_info)
@@ -451,7 +318,7 @@ class QEventLoop(QtCore.QObject, _baseclass):
             try:
                 # Let's try the default handler.
                 self.default_exception_handler({
-                    'message': 'Unhandled error in exception handler',
+                    'message': 'Unhandled error in custom exception handler',
                     'exception': exc,
                     'context': context,
                 })
@@ -469,7 +336,6 @@ class QEventLoop(QtCore.QObject, _baseclass):
 
     def set_debug(self, enabled):
         self.__debug_enabled = enabled
-        return self.run_forever()
 
     def __enter__(self):
         asyncio.set_event_loop(self)
@@ -477,10 +343,23 @@ class QEventLoop(QtCore.QObject, _baseclass):
     def __exit__(self, *args):
         try:
             self.stop()
+            self.close()
         finally:
             asyncio.set_event_loop(None)
-            if self.__default_executor is not None:
-                self.__default_executor.close()
+
+    def __handler_helper(self, target, *args):
+        lock = threading.Lock()
+        lock.acquire()
+        handler = None
+
+        def helper_target():
+            nonlocal handler
+            handler = target(*args)
+            lock.release()
+
+        self.__io_event_loop.call_soon_threadsafe(helper_target)
+        lock.acquire()
+        return handler
 
     def __start_io_event_loop(self):
         """Start the I/O event loop which we defer to for performing I/O on another thread.
@@ -494,10 +373,10 @@ class QEventLoop(QtCore.QObject, _baseclass):
         """Worker thread for running the I/O event loop."""
         io_event_loop = asyncio.get_event_loop_policy().new_event_loop()
         assert isinstance(io_event_loop, asyncio.AbstractEventLoop)
-        io_event_loop.set_debug(True)
+        io_event_loop.set_debug(self.__debug_enabled)
         asyncio.set_event_loop(io_event_loop)
         self.__io_event_loop = io_event_loop
-        self.__event_loop_started.release()
+        self.__io_event_loop.call_soon(self.__event_loop_started.release)
         self.__io_event_loop.run_forever()
 
     @staticmethod
@@ -506,20 +385,14 @@ class QEventLoop(QtCore.QObject, _baseclass):
         try:
             _logger.error(*args, **kwds)
         except:
-            print(*args)
-            print(kwds['exc_info'])
-            pass
-
-    def __make_write_pipe_transport(self, sock, protocol, waiter=None, extra=None):
-        # We want connection_lost() to be called when other end closes
-        return _ProactorWritePipeTransport(self, sock, protocol, waiter, extra)
+            sys.stderr.write('{}, {}\n'.format(args, kwds))
 
 
-class _Cancellable(object):
+class _Cancellable:
     def __init__(self, timer, loop):
         self.__timer = timer
         self.__loop = loop
 
     def cancel(self):
         self.__loop.remove(timer)
-        return self.__timer.stop()
+        self.__timer.stop()
\ No newline at end of file

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git



More information about the Pkg-bitcoin-commits mailing list