[Pkg-bitcoin-commits] [python-quamash] 08/269: Update to 3.4 asyncio API
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:10 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit 71e3cccfb497db2a378aece98853b94933b8ea29
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date: Fri Jun 27 09:05:49 2014 +0200
Update to 3.4 asyncio API
---
.gitignore | 4 +
.idea/.name | 1 +
.idea/encodings.xml | 5 +
.idea/misc.xml | 5 +
.idea/modules.xml | 9 +
.idea/quamash.iml | 9 +
.idea/scopes/scope_settings.xml | 5 +
.idea/vcs.xml | 7 +
README | 34 ++-
quamash/__init__.py | 637 +++++++++++++++++++++++++++++-----------
quamash/guievents.py | 253 ----------------
11 files changed, 524 insertions(+), 445 deletions(-)
diff --git a/.gitignore b/.gitignore
index d2d6f36..fe92069 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,3 +33,7 @@ nosetests.xml
.mr.developer.cfg
.project
.pydevproject
+
+# IDEA
+/.idea/workspace.xml
+/.idea/tasks.xml
\ No newline at end of file
diff --git a/.idea/.name b/.idea/.name
new file mode 100644
index 0000000..a17d8ae
--- /dev/null
+++ b/.idea/.name
@@ -0,0 +1 @@
+quamash
\ No newline at end of file
diff --git a/.idea/encodings.xml b/.idea/encodings.xml
new file mode 100644
index 0000000..7c62b52
--- /dev/null
+++ b/.idea/encodings.xml
@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="Encoding" useUTFGuessing="true" native2AsciiForPropertiesFiles="false" />
+</project>
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..693bd5d
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.4.1 (C:/Python34/python.exe)" project-jdk-type="Python SDK" />
+</project>
+
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..c3cbaf2
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="ProjectModuleManager">
+ <modules>
+ <module fileurl="file://$PROJECT_DIR$/.idea/quamash.iml" filepath="$PROJECT_DIR$/.idea/quamash.iml" />
+ </modules>
+ </component>
+</project>
+
diff --git a/.idea/quamash.iml b/.idea/quamash.iml
new file mode 100644
index 0000000..68c3f23
--- /dev/null
+++ b/.idea/quamash.iml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+ <component name="NewModuleRootManager">
+ <content url="file://$MODULE_DIR$" />
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ </component>
+</module>
+
diff --git a/.idea/scopes/scope_settings.xml b/.idea/scopes/scope_settings.xml
new file mode 100644
index 0000000..0d5175c
--- /dev/null
+++ b/.idea/scopes/scope_settings.xml
@@ -0,0 +1,5 @@
+<component name="DependencyValidationManager">
+ <state>
+ <option name="SKIP_IMPORT_STATEMENTS" value="false" />
+ </state>
+</component>
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..ab55cf1
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="VcsDirectoryMappings">
+ <mapping directory="$PROJECT_DIR$" vcs="Git" />
+ </component>
+</project>
+
diff --git a/README b/README
index b894600..0ebf045 100644
--- a/README
+++ b/README
@@ -3,7 +3,7 @@ Quamash
=======
Implementation of the PEP 3156 Event-Loop with Qt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-:author: Mark Harviston <mark.harviston at gmail.com>
+:author: Mark Harviston <mark.harviston at gmail.com>, Arve Knudsen <arve.knudsen at gmail.com>
Usage
=====
@@ -11,35 +11,41 @@ Usage
.. code:: python
import quamash
- import tulip
import time
+ import asyncio
+ import sys
+ import logging
- from PyQt4 import QtCore, QtGui
- # - or - #
- from PySide import QtCore, QtGui
+ try:
+ from PyQt5.QtWidgets import QApplication, QMainWindow
+ except ImportError:
+ from PySide.QtGui import QApplication, QMainWindow
def identity(x):
time.sleep(10)
return x
- @quamash.task
- def my_task(loop, executor):
+ @asyncio.coroutine
+ def my_task(loop):
for x in range(5):
- y = yield from loop.run_in_executor(executor, identity, x)
+ y = yield from loop.run_in_executor(None, identity, x)
assert x == y
loop.stop()
+ logging.basicConfig(level=logging.DEBUG)
+ _logger = logging.getLogger('App')
+
if __name__ == '__main__':
- app = QApplication
- loop = quamash.QEventLoop(app)
- executor = quamash.QThreadExecutor(5)
+ app = QApplication(sys.argv)
- win = QtGui.QMainWindow()
+ win = QMainWindow()
win.show()
- with loop, executor:
- loop.call_soon(my_task, loop, executor)
+ loop = quamash.QEventLoop(app)
+ with loop:
+ loop.call_soon(my_task, loop)
+
loop.run_forever()
diff --git a/quamash/__init__.py b/quamash/__init__.py
index 9374fb1..43802b3 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -1,202 +1,483 @@
#!/usr/bin/env python3
# -*- coding=utf-8 -*- #
# © 2013 Mark Harviston <mark.harviston at gmail.com>
+# © 2014 Arve Knudsen <arve.knudsen at gmail.com>
# BSD License
"""
Implementation of the PEP 3156 Event-Loop with Qt
"""
-__author__ = 'Mark Harviston <mark.harviston at gmail.com>'
-__version__ = '0.1'
+__author__ = 'Mark Harviston <mark.harviston at gmail.com>, Arve Knudsen <arve.knudsen at gmail.com>'
+__version__ = '0.2'
__license__ = 'BSD 2 Clause License'
-import tulip as async
-
+import asyncio
+from asyncio import tasks
+import asyncio.events
+import socket
import time
from functools import partial, wraps
-import sys
-import logging # noqa
-import requests
-import feedparser
+import logging
from queue import Queue
from concurrent.futures import Future
-
-from .guievents import GuiEventLoop
+import subprocess
+import threading
try:
- from PySide import QtGui, QtCore
+ from PySide import QtCore
except ImportError:
- from PyQt4 import QtGui, QtCore
+ from PyQt5 import QtCore
+
+_logger = logging.getLogger(__name__)
-class QThreadWorker(QtCore.QThread):
- """
- Read from the queue.
+class _QThreadWorker(QtCore.QThread):
+ """
+ Read from the queue.
- For use by the QThreadExecutor
- """
- def __init__(self, queue):
- self.queue = queue
- self.STOP = False
- super().__init__()
+ For use by the QThreadExecutor
+ """
+ def __init__(self, queue):
+ self.queue = queue
+ self.STOP = False
+ super().__init__()
- def run(self):
- while not self.STOP:
- future, fn, args, kwargs = self.queue.get()
- if future.set_running_or_notify_cancel():
- r = fn(*args, **kwargs)
- future.set_result(r)
+ def run(self):
+ while not self.STOP:
+ future, fn, args, kwargs = self.queue.get()
+ if future.set_running_or_notify_cancel():
+ r = fn(*args, **kwargs)
+ future.set_result(r)
- def stop(self):
- self.STOP = True
+ def stop(self):
+ self.STOP = True
class QThreadExecutor(QtCore.QObject):
- """
- ThreadExecutor that produces QThreads
- Same API as `concurrent.futures.Executor`
-
- >>> with QThreadExecutor(5) as executor:
- >>> f = executor.submit(lambda x: 2 + x, x)
- >>> r = f.result()
- >>> assert r == 4
- """
- def __init__(self, max_workers, parent=None):
- super().__init__(parent)
- self.max_workers = max_workers
- self.queue = Queue()
- self.workers = [QThreadWorker(self.queue, loop) for i in range(max_workers)]
- for w in self.workers:
- w.start()
-
- def submit(self, fn, *args, **kwargs):
- future = Future()
- self.queue.put((future, fn, args, kwargs))
- return future
-
- def map(self, func, *iterables, timeout=None):
- raise NotImplemented("use as_completed on the event loop")
-
- def shutdown(self, wait=True):
- map(lambda w: w.stop(), self.workers)
-
- def __enter__(self, *args):
- pass
-
- def __exit__(self, *args):
- self.shutdown()
-
-
-def easycallback(fn):
- """
- Decorator that wraps a callback in a signal, and packs & unpacks arguments,
- Makes the wrapped function effectively threadsafe. If you call the function
- from one thread, it will be executed in the thread the QObject has affinity
- with.
-
- Remember: only objects that inherit from QObject can support signals/slots
-
- >>> class MyObject(QObject):
- >>> @easycallback
- >>> def mycallback(self):
- >>> dostuff()
- >>>
- >>> myobject = MyObject()
- >>>
- >>> @task
- >>> def mytask():
- >>> myobject.mycallback()
- >>>
- >>> loop = QEventLoop()
- >>> with loop:
- >>> loop.call_soon(mytask)
- >>> loop.run_forever()
- """
- signal = QtCore.pyqtSignal(object, tuple, dict)
-
- def out_wrapper(self, args, kwargs):
- return fn(self, *args, **kwargs)
-
- @wraps(fn)
- def in_wrapper(self, *args, **kwargs):
- return signal.emit(self, args, kwargs)
-
- signal.connect(out_wrapper)
- return in_wrapper
-
-
-class QEventLoop(QtCore.QObject, GuiEventLoop):
- """
- Implementation of tulip event loop that uses the Qt Event loop
- >>> @qumash.task
- >>> def my_task(x):
- >>> return x + 2
- >>>
- >>> app = QApplication()
- >>> with QEventLoop(app) as loop:
- >>> y = loop.call_soon(my_task)
- >>>
- >>> assert y == 4
- """
-
- def __init__(self, app=None):
- super().__init__()
- self.timers = []
-
- # Event Loop API
- def run(self):
- """Run the event loop. Block until there is nothing left to do."""
- return self.run_forever()
-
- def __enter__(self):
- async.set_event_loop(self)
-
- def __exit__(self, *args):
- self.stop()
- async.set_event_loop(None)
-
- def close(self):
- self.stop()
- self.timers = []
- self.app = None
-
- @easycallback
- def call_soon_threadsafe(self, fn, *args):
- self.call_soon(fn, *args)
-
- def _create_timer(self, delay, fn, *args, singleshot):
- timer = QtCore.QTimer(self.app)
- timer.timeout.connect(partial(fn, *args))
- if singleshot:
- timer.timeout.connect(lambda: self.timers.remove(timer))
- timer.setSingleShot(singleshot)
- timer.start(delay * 1000)
-
- self.timers.append(timer)
-
- return Cancellable(timer)
-
- def call_later(self, delay, fn, *args):
- self._create_timer(delay, fn, *args, singleshot=True)
-
- def call_at(self, at, fn, *args):
- self.call_later(at - self.time(), fn, *args)
-
- def time(self):
- return time.monotonic()
-
- def run_forever(self):
- return self.app.exec_()
-
- def stop(self):
- super().stop()
- self.app.exit()
-
-
-class Cancellable(object):
- def __init__(self, timer, loop):
- self.timer = timer
- self.loop = loop
-
- def cancel(self):
- self.loop.remove(timer)
- return self.timer.stop()
+ """
+ ThreadExecutor that produces QThreads
+ Same API as `concurrent.futures.Executor`
+
+ >>> with QThreadExecutor(5) as executor:
+ >>> f = executor.submit(lambda x: 2 + x, x)
+ >>> r = f.result()
+ >>> assert r == 4
+ """
+ def __init__(self, max_workers=10, parent=None):
+ super().__init__(parent)
+ self.__max_workers = max_workers
+ self.__queue = Queue()
+ self.__workers = [_QThreadWorker(self.__queue) for i in range(max_workers)]
+ for w in self.__workers:
+ w.start()
+
+ def submit(self, fn, *args, **kwargs):
+ future = Future()
+ self.__queue.put((future, fn, args, kwargs))
+ return future
+
+ def map(self, func, *iterables, timeout=None):
+ raise NotImplemented("use as_completed on the event loop")
+
+ def close(self):
+ for w in self.__workers:
+ w.stop()
+
+ def __enter__(self, *args):
+ pass
+
+ def __exit__(self, *args):
+ self.close()
+
+
+def _easycallback(fn):
+ """
+ Decorator that wraps a callback in a signal, and packs & unpacks arguments,
+ Makes the wrapped function effectively threadsafe. If you call the function
+ from one thread, it will be executed in the thread the QObject has affinity
+ with.
+
+ Remember: only objects that inherit from QObject can support signals/slots
+
+ >>> class MyObject(QObject):
+ >>> @_easycallback
+ >>> def mycallback(self):
+ >>> dostuff()
+ >>>
+ >>> myobject = MyObject()
+ >>>
+ >>> @task
+ >>> def mytask():
+ >>> myobject.mycallback()
+ >>>
+ >>> loop = QEventLoop()
+ >>> with loop:
+ >>> loop.call_soon(mytask)
+ >>> loop.run_forever()
+ """
+ def out_wrapper(self, args, kwargs):
+ return fn(self, *args, **kwargs)
+
+ @wraps(fn)
+ def in_wrapper(self, *args, **kwargs):
+ return signaler.signal.emit(self, args, kwargs)
+
+ class Signaler(QtCore.QObject):
+ signal = QtCore.pyqtSignal(object, tuple, dict)
+
+ signaler = Signaler()
+ signaler.signal.connect(out_wrapper)
+ return in_wrapper
+
+
+class QEventLoop(QtCore.QObject, asyncio.events.AbstractEventLoop):
+ """
+ Implementation of asyncio event loop that uses the Qt Event loop
+ >>> @quamash.task
+ >>> def my_task(x):
+ >>> return x + 2
+ >>>
+ >>> app = QApplication()
+ >>> with QEventLoop(app) as loop:
+ >>> y = loop.call_soon(my_task)
+ >>>
+ >>> assert y == 4
+ """
+ def __init__(self, app):
+ super().__init__()
+
+ self.__start_io_event_loop()
+ self.__timers = []
+ self.__app = app
+ self.__is_running = False
+ self.__debug_enabled = False
+ self.__default_executor = None
+
+ def run_forever(self):
+ self.__is_running = True
+ _logger.debug('Starting Qt event loop')
+ try:
+ rslt = self.__app.exec_()
+ return rslt
+ finally:
+ self.__is_running = False
+
+ def run_until_complete(self, future):
+ """Run until Future is complete."""
+ future = tasks.async(future, loop=self)
+ future.add_done_callback(self.stop)
+ self.run_forever()
+ future.remove_done_callback(self.stop)
+ if not future.done():
+ raise RuntimeError('Event loop stopped before Future completed.')
+
+ return future.result()
+
+ def stop(self):
+ """Stop event loop."""
+ _logger.debug('Stopping eventloop...')
+ self.__io_event_loop.call_soon_threadsafe(self.__io_event_loop.stop)
+ self.__app.exit()
+ _logger.debug('Stopped eventloop')
+
+ def is_running(self):
+ """Is event loop running?"""
+ return self.__is_running
+
+ def close(self):
+ """Close event loop."""
+ self.stop()
+ self.__timers = []
+ self.__app = None
+
+ def call_later(self, delay, callback, *args):
+ """Register callback to be invoked after a certain delay."""
+ self.__create_timer(delay, callback, *args)
+
+ def time(self):
+ """Get time according to event loop's clock."""
+ return time.monotonic()
+
+ # Methods for interacting with threads.
+
+ @_easycallback
+ def call_soon_threadsafe(self, callback, *args):
+ """Thread-safe version of call_soon."""
+ self.call_soon(callback, *args)
+
+ def call_at(self, when, callback, *args):
+ """Register callback to be invoked at a certain time."""
+ self.call_later(when - self.time(), callback, *args)
+
+ def run_in_executor(self, executor, callback, *args):
+ if isinstance(callback, events.Handle):
+ assert not args
+ assert not isinstance(callback, events.TimerHandle)
+ if callback.cancelled:
+ f = futures.Future()
+ f.set_result(None)
+ return f
+ callback, args = callback.callback, callback.args
+
+ if executor is None:
+ executor = self.__default_executor
+ if executor is None:
+ executor = self.__default_executor = QThreadExecutor()
+ return self.wrap_future(executor.submit(callback, *args))
+
+ def set_default_executor(self, executor):
+ self.__default_executor = executor
+
+ # Network I/O methods returning Futures.
+
+ def getaddrinfo(self, host, port, *, family=0, type_=0, proto=0, flags=0):
+ return self._io_helper(
+ self.__io_event_loop.getaddrinfo, (host, port), {
+ 'family': family, 'type': type_, 'proto': proto, 'flags': flags,
+ })
+
+ def getnameinfo(self, sockaddr, flags=0):
+ return self._io_helper(self.__io_event_loop.getnameinfo, (sockaddr, flags), {})
+
+ def create_connection(self, protocol_factory, host=None, port=None, *,
+ ssl=None, family=0, proto=0, flags=0, sock=None,
+ local_addr=None, server_hostname=None):
+ raise NotImplementedError
+
+ def create_connection(
+ self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0,
+ sock=None, local_addr=None, server_hostname=None
+ ):
+ return self._io_helper(
+ self.__io_event_loop.create_connection,
+ (protocol_factory, host, port), {
+ 'family': family, 'proto': proto, 'flags': flags, 'sock': sock
+ })
+
+ def create_server(self, protocol_factory, host=None, port=None, *,
+ family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
+ sock=None, backlog=100, ssl=None, reuse_address=None):
+ """A coroutine which creates a TCP server bound to host and port.
+
+ The return value is a Server object which can be used to stop
+ the service.
+
+ If host is an empty string or None all interfaces are assumed
+ and a list of multiple sockets will be returned (most likely
+ one for IPv4 and another one for IPv6).
+
+ family can be set to either AF_INET or AF_INET6 to force the
+ socket to use IPv4 or IPv6. If not set it will be determined
+ from host (defaults to AF_UNSPEC).
+
+ flags is a bitmask for getaddrinfo().
+
+ sock can optionally be specified in order to use a preexisting
+ socket object.
+
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
+
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
+
+ reuse_address tells the kernel to reuse a local socket in
+ TIME_WAIT state, without waiting for its natural timeout to
+ expire. If not specified will automatically be set to True on
+ UNIX.
+ """
+ raise NotImplementedError
+
+ def create_unix_connection(self, protocol_factory, path, *,
+ ssl=None, sock=None,
+ server_hostname=None):
+ raise NotImplementedError
+
+ def create_unix_server(self, protocol_factory, path, *,
+ sock=None, backlog=100, ssl=None):
+ """A coroutine which creates a UNIX Domain Socket server.
+
+ The return value is a Server object, which can be used to stop
+ the service.
+
+ path is a str, representing a file systsem path to bind the
+ server socket to.
+
+ sock can optionally be specified in order to use a preexisting
+ socket object.
+
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
+
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
+ """
+ raise NotImplementedError
+
+ def create_datagram_endpoint(self, protocol_factory,
+ local_addr=None, remote_addr=None, *,
+ family=0, proto=0, flags=0):
+ raise NotImplementedError
+
+ # Pipes and subprocesses.
+
+ def connect_read_pipe(self, protocol_factory, pipe):
+ """Register read pipe in event loop.
+
+ protocol_factory should instantiate object with Protocol interface.
+ pipe is file-like object already switched to nonblocking.
+ Return pair (transport, protocol), where transport support
+ ReadTransport interface."""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
+
+ def connect_write_pipe(self, protocol_factory, pipe):
+ """Register write pipe in event loop.
+
+ protocol_factory should instantiate object with BaseProtocol interface.
+ Pipe is file-like object already switched to nonblocking.
+ Return pair (transport, protocol), where transport support
+ WriteTransport interface."""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
+
+ def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ **kwargs):
+ raise NotImplementedError
+
+ def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ **kwargs):
+ raise NotImplementedError
+
+ # Ready-based callback registration methods.
+ # The add_*() methods return None.
+ # The remove_*() methods return True if something was removed,
+ # False if there was nothing to delete.
+
+ def add_reader(self, fd, callback, *args):
+ return _ready_helper(self.__io_event_loop.add_reader, fd, callback, *args)
+
+ def remove_reader(self, fd):
+ return _ready_helper(self.__io_event_loop.remove_reader, fd)
+
+ def add_writer(self, fd, callback, *args):
+ return _ready_helper(self.__io_event_loop.add_writer, fd, callback, *args)
+
+ def remove_writer(self, fd):
+ return _ready_helper(self.__io_event_loop.remove_writer, fd)
+
+ # Completion based I/O methods returning Futures.
+
+ def sock_recv(self, sock, nbytes):
+ return self._io_helper(
+ self.__io_event_loop.sock_recv,
+ (sock, nbytes),
+ {},
+ )
+
+ def sock_sendall(self, sock, data):
+ return self._io_helper(self.__io_event_loop.sock_sendall, (sock, data), {})
+
+ def sock_connect(self, sock, address):
+ return self._io_helper(self.__io_event_loop.sock_connect, (sock, address), {})
+
+ def sock_accept(self, sock):
+ return self._io_helper(self.__io_event_loop.sock_accept, (sock, ), {})
+
+ # Signal handling.
+
+ def __handler_helper(self, target, *args):
+ lock = threading.Lock()
+ lock.acquire()
+ handler = None
+
+ def helper_target():
+ nonlocal handler
+ handler = target(*args)
+ lock.release()
+
+ self.__io_event_loop.call_soon_threadsafe(helper_target)
+ lock.acquire()
+ return handler
+
+ def add_signal_handler(self, sig, callback, *args):
+ return self.__handler_helper(self.add_signal_handler, sig, callback, *args)
+
+ def remove_signal_handler(self, sig):
+ return self.__handler_helper(self.remove_signal_handler, sig)
+
+ # Error handlers.
+
+ def set_exception_handler(self, handler):
+ raise NotImplementedError
+
+ def default_exception_handler(self, context):
+ raise NotImplementedError
+
+ def call_exception_handler(self, context):
+ raise NotImplementedError
+
+ # Debug flag management.
+
+ def get_debug(self):
+ return self.__debug_enabled
+
+ def set_debug(self, enabled):
+ self.__debug_enabled = enabled
+ return self.run_forever()
+
+ def __enter__(self):
+ asyncio.set_event_loop(self)
+
+ def __exit__(self, *args):
+ try:
+ self.stop()
+ finally:
+ asyncio.set_event_loop(None)
+ if self.__default_executor is not None:
+ self.__default_executor.close()
+
+ def __create_timer(self, delay, fn, *args):
+ timer = QtCore.QTimer(self.__app)
+ timer.timeout.connect(lambda: fn(*args))
+ timer.timeout.connect(lambda: self.__timers.remove(timer))
+ timer.setSingleShot(True)
+ timer.start(delay * 1000)
+ self.__timers.append(timer)
+
+ return _Cancellable(timer, self)
+
+ def __start_io_event_loop(self):
+ """Start the I/O event loop which we defer to for performing I/O on another thread.
+ """
+ self.__event_loop_started = threading.Lock()
+ self.__event_loop_started.acquire()
+ threading.Thread(None, self.__io_event_loop_thread).start()
+ self.__event_loop_started.acquire()
+
+ def __io_event_loop_thread(self):
+ """Worker thread for running the I/O event loop."""
+ io_event_loop = asyncio.get_event_loop_policy().new_event_loop()
+ assert isinstance(io_event_loop, asyncio.AbstractEventLoop)
+ io_event_loop.set_debug(True)
+ asyncio.set_event_loop(io_event_loop)
+ self.__io_event_loop = io_event_loop
+ self.__event_loop_started.release()
+ self.__io_event_loop.run_forever()
+
+
+class _Cancellable(object):
+ def __init__(self, timer, loop):
+ self.__timer = timer
+ self.__loop = loop
+
+ def cancel(self):
+ self.__loop.remove(timer)
+ return self.__timer.stop()
diff --git a/quamash/guievents.py b/quamash/guievents.py
deleted file mode 100644
index c3fe2c5..0000000
--- a/quamash/guievents.py
+++ /dev/null
@@ -1,253 +0,0 @@
-#!/usr/bin/python3
-"""
-Adapted from work at microsoft to work with the tk event loop
-
-"""
-from tulip.events import AbstractEventLoop
-from tulip import events
-import tulip
-import concurrent
-import threading
-import logging
-from tulip import futures
-
-_MAX_WORKERS = 10
-
-
-class GuiEventLoop(AbstractEventLoop):
- # Methods returning Futures for interacting with threads.
- def __init__(self):
- pass
- self._start_io_event_loop()
-
- def _start_io_event_loop(self):
- """Starts the I/O event loop which we defer to for doing I/O running on
- another thread"""
- self._event_loop_started = threading.Lock()
- self._event_loop_started.acquire()
- threading.Thread(None, self._io_event_loop_thread).start()
- self._event_loop_started.acquire()
-
- def _io_event_loop_thread(self):
- """Worker thread for running the I/O event loop"""
- io_event_loop = tulip.get_event_loop_policy().new_event_loop()
- tulip.set_event_loop(io_event_loop)
- assert isinstance(io_event_loop, AbstractEventLoop)
- self._io_event_loop = io_event_loop
- self._event_loop_started.release()
- self._io_event_loop.run_forever()
-
- def stop(self):
- self._io_event_loop.stop()
-
- def wrap_future(self, future):
- if isinstance(future, futures.Future):
- return future # Don't wrap our own type of Future.
- new_future = futures.Future()
- future.add_done_callback(
- lambda future:
- self.call_soon_threadsafe(new_future._copy_state, future))
- return new_future
-
- def run_until_complete(self, future, timeout=None): # NEW!
- """Run the event loop until a Future is done.
-
- Return the Future's result, or raise its exception.
-
- If timeout is not None, run it for at most that long;
- if the Future is still not done, raise TimeoutError
- (but don't cancel the Future).
- """
- assert isinstance(future, futures.Future)
-
- start_time = self.time()
- while not future.done():
- if (timeout is None) or ((self.time() - start_time) < timeout):
- self.run_once()
- else:
- raise tulip.TimeoutError
-
- return future.result()
-
- def call_repeatedly(self, interval, callback, *args): # NEW!
- return _CancelJobRepeatedly(self, interval, callback, args)
-
- def call_soon(self, callback, *args):
- return self.call_later(0, callback, *args)
-
- def call_soon_threadsafe(self, callback, *args):
- return self.call_soon(callback, *args)
-
- def run_in_executor(self, executor, callback, *args):
- if isinstance(callback, events.Handle):
- assert not args
- assert not isinstance(callback, events.TimerHandle)
- if callback.cancelled:
- f = futures.Future()
- f.set_result(None)
- return f
- callback, args = callback.callback, callback.args
-
- if executor is None:
- executor = self._default_executor
- if executor is None:
- executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
- self._default_executor = executor
- return self.wrap_future(executor.submit(callback, *args))
-
- def _io_helper(self, target, args, kwargs):
- lock = threading.Lock()
- lock.acquire()
- res = None
-
- def helper_target():
- nonlocal res
- res = target(*args, **kwargs)
- lock.release()
-
- handler = self._io_event_loop.call_soon_threadsafe(helper_target)
- lock.acquire()
- return res
-
- # Network I/O methods returning Futures.
- def getaddrinfo(
- self, host, port, *,
- family=0, type=0, proto=0, flags=0):
-
- return self._io_helper(
- self._io_event_loop.getaddrinfo,
- (host, port),
- {
- 'family': family, 'type': type,
- 'proto': proto, 'flags': flags,
- }
- )
-
- def getnameinfo(self, sockaddr, flags=0):
- return self._io_helper(
- self._io_event_loop.getnameinfo,
- (sockaddr, flags),
- {}
- )
-
- def create_connection(
- self, protocol_factory, host=None, port=None, *,
- family=0, proto=0, flags=0, sock=None):
-
- return self._io_helper(
- self._io_event_loop.create_connection,
- (protocol_factory, host, port),
- {
- 'family': family, 'proto': proto,
- 'flags': flags, 'sock': sock
- }
- )
-
- def start_serving(
- self, protocol_factory, host=None, port=None, *,
- family=0, proto=0, flags=0, sock=None):
-
- return self._io_helper(
- self._io_event_loop.start_serving,
- (protocol_factory, host, port),
- {
- 'family': family, 'proto': proto,
- 'flags': flags, 'sock': sock
- },
- )
-
- # Ready-based callback registration methods.
- # The add_*() methods return a Handler.
- # The remove_*() methods return True if something was removed,
- # False if there was nothing to delete.
-
- def _handler_helper(self, target, *args):
- lock = threading.Lock()
- lock.acquire()
- handler = None
-
- def helper_target():
- nonlocal handler
- handler = target(*args)
- lock.release()
-
- self._io_event_loop.call_soon_threadsafe(helper_target)
- lock.acquire()
- return handler
-
- def add_reader(self, fd, callback, *args):
- return _ready_helper(self._io_event_loop.add_reader, fd, callback, *args)
-
- def remove_reader(self, fd):
- return _ready_helper(self._io_event_loop.remove_reader, fd)
-
- def add_writer(self, fd, callback, *args):
- return _ready_helper(self._io_event_loop.add_writer, fd, callback, *args)
-
- def remove_writer(self, fd):
- return _ready_helper(self._io_event_loop.remove_writer, fd)
-
- def connect_read_pipe(self, protocol_factory, pipe):
- raise NotImplemented # FIXME
-
- def connect_write_pipe(self, protocol_factory, pipe):
- raise NotImplemented # FIXME
-
- # Completion based I/O methods returning Futures.
-
- def sock_recv(self, sock, nbytes):
- return self._io_helper(
- self._io_event_loop.sock_recv,
- (sock, nbytes),
- {},
- )
-
- def sock_sendall(self, sock, data):
- return self._io_helper(
- self._io_event_loop.sock_sendall,
- (sock, data),
- {}
- )
-
- def sock_connect(self, sock, address):
- return self._io_helper(
- self._io_event_loop.sock_connect,
- (sock, address),
- {},
- )
- return self.run_in_executor(None, sock.connect, address)
-
- def sock_accept(self, sock):
- return self._io_helper(
- self._io_event_loop.sock_accept,
- (sock, ),
- {}
- )
-
- # Signal handling.
-
- def add_signal_handler(self, sig, callback, *args):
- return self._handler_helper(self.add_signal_handler, sig, callback, *args)
-
- def remove_signal_handler(self, sig):
- return self._handler_helper(self.remove_signal_handler, sig)
-
-
-class _CancelJobRepeatedly(object):
- """Object that allows cancelling of a call_repeatedly"""
- def __init__(self, event_loop, delay, callback, args):
- self.event_loop = event_loop
- self.delay = delay
- self.callback = callback
- self.args = args
- self.post()
-
- def invoke(self):
- self.callback(*self.args)
- self.post()
-
- def post(self):
- self.canceler = self.event_loop.call_later(self.delay, self.invoke)
-
- def cancel(self):
- self.canceler.cancel()
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list