[Pkg-bitcoin-commits] [python-quamash] 12/269: Base implementation on standard ProactorEventLoop/SelectorEventLoop
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:11 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit 30fdfa7437ed0ae858d9b2173e2f0671cd11912a
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date: Fri Jun 27 19:43:31 2014 +0200
Base implementation on standard ProactorEventLoop/SelectorEventLoop
---
quamash/__init__.py | 164 +++++++++++++++++++++++++++++++++-------------------
1 file changed, 103 insertions(+), 61 deletions(-)
diff --git a/quamash/__init__.py b/quamash/__init__.py
index 43802b3..8024143 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -10,8 +10,10 @@ __author__ = 'Mark Harviston <mark.harviston at gmail.com>, Arve Knudsen <arve.knud
__version__ = '0.2'
__license__ = 'BSD 2 Clause License'
+import sys
+import os
import asyncio
-from asyncio import tasks
+from asyncio import futures
import asyncio.events
import socket
import time
@@ -129,7 +131,10 @@ def _easycallback(fn):
return in_wrapper
-class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
+_baseclass = asyncio.ProactorEventLoop if os.name == 'nt' else asyncio.SelectorEventLoop
+
+
+class QEventLoop(QtCore.QObject, _baseclass):
"""
Implementation of asyncio event loop that uses the Qt Event loop
>>> @quamash.task
@@ -143,14 +148,15 @@ class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
>>> assert y == 4
"""
def __init__(self, app):
- super().__init__()
-
self.__start_io_event_loop()
self.__timers = []
self.__app = app
self.__is_running = False
self.__debug_enabled = False
self.__default_executor = None
+ self.__exception_handler = None
+
+ super().__init__()
def run_forever(self):
self.__is_running = True
@@ -163,7 +169,7 @@ class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
def run_until_complete(self, future):
"""Run until Future is complete."""
- future = tasks.async(future, loop=self)
+ future = asyncio.async(future, loop=self)
future.add_done_callback(self.stop)
self.run_forever()
future.remove_done_callback(self.stop)
@@ -191,7 +197,28 @@ class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
def call_later(self, delay, callback, *args):
"""Register callback to be invoked after a certain delay."""
- self.__create_timer(delay, callback, *args)
+ if not callable(callback):
+ raise TypeError('callback must be callable: {}'.format(type(callback).__name__))
+
+ _logger.debug('Invoking callback {} after {} seconds'.format(
+ callback, delay
+ ))
+
+ timer = QtCore.QTimer(self.__app)
+ timer.timeout.connect(lambda: callback(*args))
+ timer.timeout.connect(lambda: self.__timers.remove(timer))
+ timer.setSingleShot(True)
+ timer.start(delay * 1000)
+ self.__timers.append(timer)
+
+ return _Cancellable(timer, self)
+
+ def call_soon(self, callback, *args):
+ self.call_later(0, callback, *args)
+
+ def call_at(self, when, callback, *args):
+ """Register callback to be invoked at a certain time."""
+ self.call_later(when - self.time(), callback, *args)
def time(self):
"""Get time according to event loop's clock."""
@@ -204,10 +231,6 @@ class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
"""Thread-safe version of call_soon."""
self.call_soon(callback, *args)
- def call_at(self, when, callback, *args):
- """Register callback to be invoked at a certain time."""
- self.call_later(when - self.time(), callback, *args)
-
def run_in_executor(self, executor, callback, *args):
if isinstance(callback, events.Handle):
assert not args
@@ -318,44 +341,6 @@ class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
family=0, proto=0, flags=0):
raise NotImplementedError
- # Pipes and subprocesses.
-
- def connect_read_pipe(self, protocol_factory, pipe):
- """Register read pipe in event loop.
-
- protocol_factory should instantiate object with Protocol interface.
- pipe is file-like object already switched to nonblocking.
- Return pair (transport, protocol), where transport support
- ReadTransport interface."""
- # The reason to accept file-like object instead of just file descriptor
- # is: we need to own pipe and close it at transport finishing
- # Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
- raise NotImplementedError
-
- def connect_write_pipe(self, protocol_factory, pipe):
- """Register write pipe in event loop.
-
- protocol_factory should instantiate object with BaseProtocol interface.
- Pipe is file-like object already switched to nonblocking.
- Return pair (transport, protocol), where transport support
- WriteTransport interface."""
- # The reason to accept file-like object instead of just file descriptor
- # is: we need to own pipe and close it at transport finishing
- # Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
- raise NotImplementedError
-
- def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- **kwargs):
- raise NotImplementedError
-
- def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- **kwargs):
- raise NotImplementedError
-
# Ready-based callback registration methods.
# The add_*() methods return None.
# The remove_*() methods return True if something was removed,
@@ -416,13 +401,66 @@ class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
# Error handlers.
def set_exception_handler(self, handler):
- raise NotImplementedError
+ self.__exception_handler = handler
def default_exception_handler(self, context):
- raise NotImplementedError
+ """Default exception handler.
+
+ This is called when an exception occurs and no exception
+ handler is set, and can be called by a custom exception
+ handler that wants to defer to the default behavior.
+
+ context parameter has the same meaning as in
+ `call_exception_handler()`.
+ """
+ _logger.debug('Executing default exception handler')
+ message = context.get('message')
+ if not message:
+ message = 'Unhandled exception in event loop'
+
+ exception = context.get('exception')
+ if exception is not None:
+ exc_info = (type(exception), exception, exception.__traceback__)
+ else:
+ exc_info = False
+
+ log_lines = [message]
+ for key in sorted(context):
+ if key in {'message', 'exception'}:
+ continue
+ log_lines.append('{}: {!r}'.format(key, context[key]))
+
+ self.__log_error('\n'.join(log_lines), exc_info=exc_info)
def call_exception_handler(self, context):
- raise NotImplementedError
+ if self.__exception_handler is None:
+ try:
+ self.default_exception_handler(context)
+ except Exception:
+ # Second protection layer for unexpected errors
+ # in the default implementation, as well as for subclassed
+ # event loops with overloaded "default_exception_handler".
+ self.__log_error('Exception in default exception handler', exc_info=True)
+
+ return
+
+ try:
+ self.__exception_handler(self, context)
+ except Exception as exc:
+ # Exception in the user set custom exception handler.
+ try:
+ # Let's try the default handler.
+ self.default_exception_handler({
+ 'message': 'Unhandled error in exception handler',
+ 'exception': exc,
+ 'context': context,
+ })
+ except Exception:
+ # Guard 'default_exception_handler' in case it's
+ # overloaded.
+ self.__log_error(
+ 'Exception in default exception handler while handling an unexpected error '
+ 'in custom exception handler', exc_info=True)
# Debug flag management.
@@ -444,16 +482,6 @@ class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
if self.__default_executor is not None:
self.__default_executor.close()
- def __create_timer(self, delay, fn, *args):
- timer = QtCore.QTimer(self.__app)
- timer.timeout.connect(lambda: fn(*args))
- timer.timeout.connect(lambda: self.__timers.remove(timer))
- timer.setSingleShot(True)
- timer.start(delay * 1000)
- self.__timers.append(timer)
-
- return _Cancellable(timer, self)
-
def __start_io_event_loop(self):
"""Start the I/O event loop which we defer to for performing I/O on another thread.
"""
@@ -472,6 +500,20 @@ class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
self.__event_loop_started.release()
self.__io_event_loop.run_forever()
+ @staticmethod
+ def __log_error(*args, **kwds):
+ # In some cases, the error method itself fails, don't have a lot of options in that case
+ try:
+ _logger.error(*args, **kwds)
+ except:
+ print(*args)
+ print(kwds['exc_info'])
+ pass
+
+ def __make_write_pipe_transport(self, sock, protocol, waiter=None, extra=None):
+ # We want connection_lost() to be called when other end closes
+ return _ProactorWritePipeTransport(self, sock, protocol, waiter, extra)
+
class _Cancellable(object):
def __init__(self, timer, loop):
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list