[Pkg-bitcoin-commits] [python-quamash] 14/269: Fix event loop close issues
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:11 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit 1ba690d077d02ba8e3bf55b324d47ff2bb500a6b
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date: Mon Jun 30 09:56:28 2014 +0200
Fix event loop close issues
---
quamash/__init__.py | 233 ++++++++++++----------------------------------------
1 file changed, 53 insertions(+), 180 deletions(-)
diff --git a/quamash/__init__.py b/quamash/__init__.py
index 8024143..68f598a 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -39,19 +39,19 @@ class _QThreadWorker(QtCore.QThread):
For use by the QThreadExecutor
"""
def __init__(self, queue):
- self.queue = queue
- self.STOP = False
+ self.__queue = queue
+ self.__stop = False
super().__init__()
def run(self):
- while not self.STOP:
- future, fn, args, kwargs = self.queue.get()
+ while not self.__stop:
+ future, fn, args, kwargs = self.__queue.get()
if future.set_running_or_notify_cancel():
r = fn(*args, **kwargs)
future.set_result(r)
def stop(self):
- self.STOP = True
+ self.__stop = True
class QThreadExecutor(QtCore.QObject):
@@ -116,9 +116,6 @@ def _easycallback(fn):
>>> loop.call_soon(mytask)
>>> loop.run_forever()
"""
- def out_wrapper(self, args, kwargs):
- return fn(self, *args, **kwargs)
-
@wraps(fn)
def in_wrapper(self, *args, **kwargs):
return signaler.signal.emit(self, args, kwargs)
@@ -127,7 +124,7 @@ def _easycallback(fn):
signal = QtCore.pyqtSignal(object, tuple, dict)
signaler = Signaler()
- signaler.signal.connect(out_wrapper)
+ signaler.signal.connect(lambda self, args, kwargs: fn(self, *args, **kwargs))
return in_wrapper
@@ -148,31 +145,38 @@ class QEventLoop(QtCore.QObject, _baseclass):
>>> assert y == 4
"""
def __init__(self, app):
- self.__start_io_event_loop()
self.__timers = []
self.__app = app
self.__is_running = False
self.__debug_enabled = False
self.__default_executor = None
self.__exception_handler = None
-
+
super().__init__()
+ self.__start_io_event_loop()
+
def run_forever(self):
+ """Run eventloop forever."""
self.__is_running = True
_logger.debug('Starting Qt event loop')
try:
rslt = self.__app.exec_()
+ _logger.debug('Qt event loop ended with result {}'.format(rslt))
return rslt
finally:
+ self.__io_event_loop.call_soon_threadsafe(self.__io_event_loop.stop)
+ super(QEventLoop, self).stop()
self.__is_running = False
def run_until_complete(self, future):
"""Run until Future is complete."""
future = asyncio.async(future, loop=self)
future.add_done_callback(self.stop)
- self.run_forever()
- future.remove_done_callback(self.stop)
+ try:
+ self.run_forever()
+ finally:
+ future.remove_done_callback(self.stop)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
@@ -181,7 +185,6 @@ class QEventLoop(QtCore.QObject, _baseclass):
def stop(self):
"""Stop event loop."""
_logger.debug('Stopping eventloop...')
- self.__io_event_loop.call_soon_threadsafe(self.__io_event_loop.stop)
self.__app.exit()
_logger.debug('Stopped eventloop')
@@ -191,9 +194,11 @@ class QEventLoop(QtCore.QObject, _baseclass):
def close(self):
"""Close event loop."""
- self.stop()
self.__timers = []
self.__app = None
+ if self.__default_executor is not None:
+ self.__default_executor.close()
+ super(QEventLoop, self).close()
def call_later(self, delay, callback, *args):
"""Register callback to be invoked after a certain delay."""
@@ -204,9 +209,12 @@ class QEventLoop(QtCore.QObject, _baseclass):
callback, delay
))
+ def upon_timeout():
+ self.__timers.remove(timer)
+ callback(*args)
+
timer = QtCore.QTimer(self.__app)
- timer.timeout.connect(lambda: callback(*args))
- timer.timeout.connect(lambda: self.__timers.remove(timer))
+ timer.timeout.connect(upon_timeout)
timer.setSingleShot(True)
timer.start(delay * 1000)
self.__timers.append(timer)
@@ -245,153 +253,13 @@ class QEventLoop(QtCore.QObject, _baseclass):
executor = self.__default_executor
if executor is None:
executor = self.__default_executor = QThreadExecutor()
- return self.wrap_future(executor.submit(callback, *args))
+ return asyncio.wrap_future(executor.submit(callback, *args))
def set_default_executor(self, executor):
self.__default_executor = executor
- # Network I/O methods returning Futures.
-
- def getaddrinfo(self, host, port, *, family=0, type_=0, proto=0, flags=0):
- return self._io_helper(
- self.__io_event_loop.getaddrinfo, (host, port), {
- 'family': family, 'type': type_, 'proto': proto, 'flags': flags,
- })
-
- def getnameinfo(self, sockaddr, flags=0):
- return self._io_helper(self.__io_event_loop.getnameinfo, (sockaddr, flags), {})
-
- def create_connection(self, protocol_factory, host=None, port=None, *,
- ssl=None, family=0, proto=0, flags=0, sock=None,
- local_addr=None, server_hostname=None):
- raise NotImplementedError
-
- def create_connection(
- self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0,
- sock=None, local_addr=None, server_hostname=None
- ):
- return self._io_helper(
- self.__io_event_loop.create_connection,
- (protocol_factory, host, port), {
- 'family': family, 'proto': proto, 'flags': flags, 'sock': sock
- })
-
- def create_server(self, protocol_factory, host=None, port=None, *,
- family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
- sock=None, backlog=100, ssl=None, reuse_address=None):
- """A coroutine which creates a TCP server bound to host and port.
-
- The return value is a Server object which can be used to stop
- the service.
-
- If host is an empty string or None all interfaces are assumed
- and a list of multiple sockets will be returned (most likely
- one for IPv4 and another one for IPv6).
-
- family can be set to either AF_INET or AF_INET6 to force the
- socket to use IPv4 or IPv6. If not set it will be determined
- from host (defaults to AF_UNSPEC).
-
- flags is a bitmask for getaddrinfo().
-
- sock can optionally be specified in order to use a preexisting
- socket object.
-
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
-
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
-
- reuse_address tells the kernel to reuse a local socket in
- TIME_WAIT state, without waiting for its natural timeout to
- expire. If not specified will automatically be set to True on
- UNIX.
- """
- raise NotImplementedError
-
- def create_unix_connection(self, protocol_factory, path, *,
- ssl=None, sock=None,
- server_hostname=None):
- raise NotImplementedError
-
- def create_unix_server(self, protocol_factory, path, *,
- sock=None, backlog=100, ssl=None):
- """A coroutine which creates a UNIX Domain Socket server.
-
- The return value is a Server object, which can be used to stop
- the service.
-
- path is a str, representing a file systsem path to bind the
- server socket to.
-
- sock can optionally be specified in order to use a preexisting
- socket object.
-
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
-
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
- """
- raise NotImplementedError
-
- def create_datagram_endpoint(self, protocol_factory,
- local_addr=None, remote_addr=None, *,
- family=0, proto=0, flags=0):
- raise NotImplementedError
-
- # Ready-based callback registration methods.
- # The add_*() methods return None.
- # The remove_*() methods return True if something was removed,
- # False if there was nothing to delete.
-
- def add_reader(self, fd, callback, *args):
- return _ready_helper(self.__io_event_loop.add_reader, fd, callback, *args)
-
- def remove_reader(self, fd):
- return _ready_helper(self.__io_event_loop.remove_reader, fd)
-
- def add_writer(self, fd, callback, *args):
- return _ready_helper(self.__io_event_loop.add_writer, fd, callback, *args)
-
- def remove_writer(self, fd):
- return _ready_helper(self.__io_event_loop.remove_writer, fd)
-
- # Completion based I/O methods returning Futures.
-
- def sock_recv(self, sock, nbytes):
- return self._io_helper(
- self.__io_event_loop.sock_recv,
- (sock, nbytes),
- {},
- )
-
- def sock_sendall(self, sock, data):
- return self._io_helper(self.__io_event_loop.sock_sendall, (sock, data), {})
-
- def sock_connect(self, sock, address):
- return self._io_helper(self.__io_event_loop.sock_connect, (sock, address), {})
-
- def sock_accept(self, sock):
- return self._io_helper(self.__io_event_loop.sock_accept, (sock, ), {})
-
# Signal handling.
- def __handler_helper(self, target, *args):
- lock = threading.Lock()
- lock.acquire()
- handler = None
-
- def helper_target():
- nonlocal handler
- handler = target(*args)
- lock.release()
-
- self.__io_event_loop.call_soon_threadsafe(helper_target)
- lock.acquire()
- return handler
-
def add_signal_handler(self, sig, callback, *args):
return self.__handler_helper(self.add_signal_handler, sig, callback, *args)
@@ -418,16 +286,15 @@ class QEventLoop(QtCore.QObject, _baseclass):
if not message:
message = 'Unhandled exception in event loop'
- exception = context.get('exception')
- if exception is not None:
- exc_info = (type(exception), exception, exception.__traceback__)
- else:
+ try:
+ exception = context['exception']
+ except KeyError:
exc_info = False
+ else:
+ exc_info = (type(exception), exception, exception.__traceback__)
log_lines = [message]
- for key in sorted(context):
- if key in {'message', 'exception'}:
- continue
+ for key in [k for k in sorted(context) if k not in {'message', 'exception'}]:
log_lines.append('{}: {!r}'.format(key, context[key]))
self.__log_error('\n'.join(log_lines), exc_info=exc_info)
@@ -451,7 +318,7 @@ class QEventLoop(QtCore.QObject, _baseclass):
try:
# Let's try the default handler.
self.default_exception_handler({
- 'message': 'Unhandled error in exception handler',
+ 'message': 'Unhandled error in custom exception handler',
'exception': exc,
'context': context,
})
@@ -469,7 +336,6 @@ class QEventLoop(QtCore.QObject, _baseclass):
def set_debug(self, enabled):
self.__debug_enabled = enabled
- return self.run_forever()
def __enter__(self):
asyncio.set_event_loop(self)
@@ -477,10 +343,23 @@ class QEventLoop(QtCore.QObject, _baseclass):
def __exit__(self, *args):
try:
self.stop()
+ self.close()
finally:
asyncio.set_event_loop(None)
- if self.__default_executor is not None:
- self.__default_executor.close()
+
+ def __handler_helper(self, target, *args):
+ lock = threading.Lock()
+ lock.acquire()
+ handler = None
+
+ def helper_target():
+ nonlocal handler
+ handler = target(*args)
+ lock.release()
+
+ self.__io_event_loop.call_soon_threadsafe(helper_target)
+ lock.acquire()
+ return handler
def __start_io_event_loop(self):
"""Start the I/O event loop which we defer to for performing I/O on another thread.
@@ -494,10 +373,10 @@ class QEventLoop(QtCore.QObject, _baseclass):
"""Worker thread for running the I/O event loop."""
io_event_loop = asyncio.get_event_loop_policy().new_event_loop()
assert isinstance(io_event_loop, asyncio.AbstractEventLoop)
- io_event_loop.set_debug(True)
+ io_event_loop.set_debug(self.__debug_enabled)
asyncio.set_event_loop(io_event_loop)
self.__io_event_loop = io_event_loop
- self.__event_loop_started.release()
+ self.__io_event_loop.call_soon(self.__event_loop_started.release)
self.__io_event_loop.run_forever()
@staticmethod
@@ -506,20 +385,14 @@ class QEventLoop(QtCore.QObject, _baseclass):
try:
_logger.error(*args, **kwds)
except:
- print(*args)
- print(kwds['exc_info'])
- pass
-
- def __make_write_pipe_transport(self, sock, protocol, waiter=None, extra=None):
- # We want connection_lost() to be called when other end closes
- return _ProactorWritePipeTransport(self, sock, protocol, waiter, extra)
+ sys.stderr.write('{}, {}\n'.format(args, kwds))
-class _Cancellable(object):
+class _Cancellable:
def __init__(self, timer, loop):
self.__timer = timer
self.__loop = loop
def cancel(self):
self.__loop.remove(timer)
- return self.__timer.stop()
+ self.__timer.stop()
\ No newline at end of file
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list