[Pkg-bitcoin-commits] [python-quamash] 01/269: initial commit
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:10 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit f83a9865f339fe529276646571767c2293e158fe
Author: Mark Harviston <mark.harviston at gmail.com>
Date: Mon Jun 10 15:28:27 2013 -0700
initial commit
---
README | 47 +++++++++++
README.rst | 1 +
guievents.py | 253 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
quamash.py | 199 ++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 500 insertions(+)
diff --git a/README b/README
new file mode 100644
index 0000000..d09a636
--- /dev/null
+++ b/README
@@ -0,0 +1,47 @@
+=======
+Quamash
+=======
+Tulip API for the Qt Event-Loop
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+:author: Mark Harviston <mark.harviston at gmail.com>
+
+Usage
+=====
+
+import quamash
+import tulip
+import time
+
+from PyQt4 import QtCore, QtGui
+# - or - #
+from PySide import QtCore, QtGui
+
+def identity(x):
+ time.sleep(10)
+ return x
+
+ at quamash.task
+def my_task(loop, executor):
+ for x in range(5):
+ y = yield from loop.run_in_executor(executor, identity, x)
+ assert x == y
+
+if __name__ == '__main__':
+ app = QApplication
+ loop = quamash.QEventLoop(app)
+
+ win = QtGui.QMainWindow()
+ win.show()
+
+ with loop:
+
+
+
+Name
+====
+Tulip related projects are being named after other flowers, Quamash starts with a "Q".
+
+License
+=======
+BSD 3 Clause License
+
diff --git a/README.rst b/README.rst
new file mode 120000
index 0000000..100b938
--- /dev/null
+++ b/README.rst
@@ -0,0 +1 @@
+README
\ No newline at end of file
diff --git a/guievents.py b/guievents.py
new file mode 100644
index 0000000..c3fe2c5
--- /dev/null
+++ b/guievents.py
@@ -0,0 +1,253 @@
+#!/usr/bin/python3
+"""
+Adapted from work at microsoft to work with the tk event loop
+
+"""
+from tulip.events import AbstractEventLoop
+from tulip import events
+import tulip
+import concurrent
+import threading
+import logging
+from tulip import futures
+
+_MAX_WORKERS = 10
+
+
+class GuiEventLoop(AbstractEventLoop):
+ # Methods returning Futures for interacting with threads.
+ def __init__(self):
+ pass
+ self._start_io_event_loop()
+
+ def _start_io_event_loop(self):
+ """Starts the I/O event loop which we defer to for doing I/O running on
+ another thread"""
+ self._event_loop_started = threading.Lock()
+ self._event_loop_started.acquire()
+ threading.Thread(None, self._io_event_loop_thread).start()
+ self._event_loop_started.acquire()
+
+ def _io_event_loop_thread(self):
+ """Worker thread for running the I/O event loop"""
+ io_event_loop = tulip.get_event_loop_policy().new_event_loop()
+ tulip.set_event_loop(io_event_loop)
+ assert isinstance(io_event_loop, AbstractEventLoop)
+ self._io_event_loop = io_event_loop
+ self._event_loop_started.release()
+ self._io_event_loop.run_forever()
+
+ def stop(self):
+ self._io_event_loop.stop()
+
+ def wrap_future(self, future):
+ if isinstance(future, futures.Future):
+ return future # Don't wrap our own type of Future.
+ new_future = futures.Future()
+ future.add_done_callback(
+ lambda future:
+ self.call_soon_threadsafe(new_future._copy_state, future))
+ return new_future
+
+ def run_until_complete(self, future, timeout=None): # NEW!
+ """Run the event loop until a Future is done.
+
+ Return the Future's result, or raise its exception.
+
+ If timeout is not None, run it for at most that long;
+ if the Future is still not done, raise TimeoutError
+ (but don't cancel the Future).
+ """
+ assert isinstance(future, futures.Future)
+
+ start_time = self.time()
+ while not future.done():
+ if (timeout is None) or ((self.time() - start_time) < timeout):
+ self.run_once()
+ else:
+ raise tulip.TimeoutError
+
+ return future.result()
+
+ def call_repeatedly(self, interval, callback, *args): # NEW!
+ return _CancelJobRepeatedly(self, interval, callback, args)
+
+ def call_soon(self, callback, *args):
+ return self.call_later(0, callback, *args)
+
+ def call_soon_threadsafe(self, callback, *args):
+ return self.call_soon(callback, *args)
+
+ def run_in_executor(self, executor, callback, *args):
+ if isinstance(callback, events.Handle):
+ assert not args
+ assert not isinstance(callback, events.TimerHandle)
+ if callback.cancelled:
+ f = futures.Future()
+ f.set_result(None)
+ return f
+ callback, args = callback.callback, callback.args
+
+ if executor is None:
+ executor = self._default_executor
+ if executor is None:
+ executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
+ self._default_executor = executor
+ return self.wrap_future(executor.submit(callback, *args))
+
+ def _io_helper(self, target, args, kwargs):
+ lock = threading.Lock()
+ lock.acquire()
+ res = None
+
+ def helper_target():
+ nonlocal res
+ res = target(*args, **kwargs)
+ lock.release()
+
+ handler = self._io_event_loop.call_soon_threadsafe(helper_target)
+ lock.acquire()
+ return res
+
+ # Network I/O methods returning Futures.
+ def getaddrinfo(
+ self, host, port, *,
+ family=0, type=0, proto=0, flags=0):
+
+ return self._io_helper(
+ self._io_event_loop.getaddrinfo,
+ (host, port),
+ {
+ 'family': family, 'type': type,
+ 'proto': proto, 'flags': flags,
+ }
+ )
+
+ def getnameinfo(self, sockaddr, flags=0):
+ return self._io_helper(
+ self._io_event_loop.getnameinfo,
+ (sockaddr, flags),
+ {}
+ )
+
+ def create_connection(
+ self, protocol_factory, host=None, port=None, *,
+ family=0, proto=0, flags=0, sock=None):
+
+ return self._io_helper(
+ self._io_event_loop.create_connection,
+ (protocol_factory, host, port),
+ {
+ 'family': family, 'proto': proto,
+ 'flags': flags, 'sock': sock
+ }
+ )
+
+ def start_serving(
+ self, protocol_factory, host=None, port=None, *,
+ family=0, proto=0, flags=0, sock=None):
+
+ return self._io_helper(
+ self._io_event_loop.start_serving,
+ (protocol_factory, host, port),
+ {
+ 'family': family, 'proto': proto,
+ 'flags': flags, 'sock': sock
+ },
+ )
+
+ # Ready-based callback registration methods.
+ # The add_*() methods return a Handler.
+ # The remove_*() methods return True if something was removed,
+ # False if there was nothing to delete.
+
+ def _handler_helper(self, target, *args):
+ lock = threading.Lock()
+ lock.acquire()
+ handler = None
+
+ def helper_target():
+ nonlocal handler
+ handler = target(*args)
+ lock.release()
+
+ self._io_event_loop.call_soon_threadsafe(helper_target)
+ lock.acquire()
+ return handler
+
+ def add_reader(self, fd, callback, *args):
+ return _ready_helper(self._io_event_loop.add_reader, fd, callback, *args)
+
+ def remove_reader(self, fd):
+ return _ready_helper(self._io_event_loop.remove_reader, fd)
+
+ def add_writer(self, fd, callback, *args):
+ return _ready_helper(self._io_event_loop.add_writer, fd, callback, *args)
+
+ def remove_writer(self, fd):
+ return _ready_helper(self._io_event_loop.remove_writer, fd)
+
+ def connect_read_pipe(self, protocol_factory, pipe):
+ raise NotImplemented # FIXME
+
+ def connect_write_pipe(self, protocol_factory, pipe):
+ raise NotImplemented # FIXME
+
+ # Completion based I/O methods returning Futures.
+
+ def sock_recv(self, sock, nbytes):
+ return self._io_helper(
+ self._io_event_loop.sock_recv,
+ (sock, nbytes),
+ {},
+ )
+
+ def sock_sendall(self, sock, data):
+ return self._io_helper(
+ self._io_event_loop.sock_sendall,
+ (sock, data),
+ {}
+ )
+
+ def sock_connect(self, sock, address):
+ return self._io_helper(
+ self._io_event_loop.sock_connect,
+ (sock, address),
+ {},
+ )
+ return self.run_in_executor(None, sock.connect, address)
+
+ def sock_accept(self, sock):
+ return self._io_helper(
+ self._io_event_loop.sock_accept,
+ (sock, ),
+ {}
+ )
+
+ # Signal handling.
+
+ def add_signal_handler(self, sig, callback, *args):
+ return self._handler_helper(self.add_signal_handler, sig, callback, *args)
+
+ def remove_signal_handler(self, sig):
+ return self._handler_helper(self.remove_signal_handler, sig)
+
+
+class _CancelJobRepeatedly(object):
+ """Object that allows cancelling of a call_repeatedly"""
+ def __init__(self, event_loop, delay, callback, args):
+ self.event_loop = event_loop
+ self.delay = delay
+ self.callback = callback
+ self.args = args
+ self.post()
+
+ def invoke(self):
+ self.callback(*self.args)
+ self.post()
+
+ def post(self):
+ self.canceler = self.event_loop.call_later(self.delay, self.invoke)
+
+ def cancel(self):
+ self.canceler.cancel()
diff --git a/quamash.py b/quamash.py
new file mode 100644
index 0000000..e0b203a
--- /dev/null
+++ b/quamash.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python3
+# -*- coding=utf-8 -*- #
+# © 2013 Mark Harviston <mark.harviston at gmail.com>
+# BSD License
+"""
+Tulip hooked up to the Qt Event Loop several utilities
+"""
+__author__ = 'Mark Harviston <mark.harviston at gmail.com>
+__version__ = '0.1'
+
+import tulip as async
+
+import time
+from functools import partial, wraps
+import sys
+import logging # noqa
+import requests
+import feedparser
+from queue import Queue
+from concurrent.futures import Future
+
+from guievents import GuiEventLoop
+
+try:
+ from PySide import QtGui, QtCore
+except ImportError:
+ from PyQt4 import QtGui, QtCore
+
+
+class QThreadWorker(QtCore.QThread):
+ """
+ Read from the queue.
+
+ For use by the QThreadExecutor
+ """
+ def __init__(self, queue):
+ self.queue = queue
+ self.STOP = False
+ super().__init__()
+
+ def run(self):
+ while not self.STOP:
+ future, fn, args, kwargs = self.queue.get()
+ if future.set_running_or_notify_cancel():
+ r = fn(*args, **kwargs)
+ future.set_result(r)
+
+ def stop(self):
+ self.STOP = True
+
+
+class QThreadExecutor(QtCore.QObject):
+ """
+ ThreadExecutor that produces QThreads
+ Same API as `concurrent.futures.Executor`
+
+ >>> with QThreadExecutor(5) as executor:
+ >>> f = executor.submit(lambda x: 2 + x, x)
+ >>> r = f.result()
+ >>> assert r == 4
+ """
+ def __init__(self, max_workers, parent=None):
+ super().__init__(parent)
+ self.max_workers = max_workers
+ self.queue = Queue()
+ self.workers = [QThreadWorker(self.queue, loop) for i in range(max_workers)]
+ for w in self.workers:
+ w.start()
+
+ def submit(self, fn, *args, **kwargs):
+ future = Future()
+ self.queue.put((future, fn, args, kwargs))
+ return future
+
+ def map(self, func, *iterables, timeout=None):
+ raise NotImplemented("use as_completed on the event loop")
+
+ def shutdown(self, wait=True):
+ map(lambda w: w.stop(), self.workers)
+
+ def __enter__(self, *args):
+ pass
+
+ def __exit__(self, *args):
+ self.shutdown()
+
+
+class QEventLoop(QtCore.QObject, GuiEventLoop):
+ """
+ Implementation of tulip event loop that uses the Qt Event loop
+ >>> @qumash.task
+ >>> def my_task(x):
+ >>> return x + 2
+ >>>
+ >>> app = QApplication()
+ >>> with QEventLoop(app) as loop:
+ >>> y = loop.call_soon(my_task)
+ >>>
+ >>> assert y == 4
+ """
+
+ def __init__(self, app=None):
+ super().__init__()
+ self.timers = []
+
+ # Event Loop API
+ def run(self):
+ """Run the event loop. Block until there is nothing left to do."""
+ return self.run_forever()
+
+ def __enter__(self):
+ async.set_event_loop(self)
+
+ def __exit__(self, *args):
+ self.stop()
+ async.set_event_loop(None)
+
+ def close(self):
+ self.stop()
+ self.timers = []
+ self.app = None
+
+ @easycallback
+ def call_soon_threadsafe(self, fn, *args):
+ self.call_soon(fn, *args)
+
+ def _create_timer(self, delay, fn, *args, singleshot):
+ timer = QtCore.QTimer(self.app)
+ timer.timeout.connect(partial(fn, *args))
+ if singleshot:
+ timer.timeout.connect(lambda: self.timers.remove(timer))
+ timer.setSingleShot(singleshot)
+ timer.start(delay * 1000)
+
+ self.timers.append(timer)
+
+ return Cancellable(timer)
+
+ def call_later(self, delay, fn, *args):
+ self._create_timer(delay, fn, *args, singleshot=True)
+
+ def call_at(self, at, fn, *args):
+ self.call_later(at - self.time(), fn, *args)
+
+ def time(self):
+ return time.monotonic()
+
+ def run_forever(self):
+ return self.app.exec_()
+
+ def stop(self):
+ super().stop()
+ self.app.exit()
+
+class Cancellable(object):
+ def __init__(self, timer, loop):
+ self.timer = timer
+ self.loop = loop
+
+ def cancel(self):
+ self.loop.remove(timer)
+ return self.timer.stop()
+
+def easycallback(fn):
+ """
+ Decorator that wraps a callback in a signal, and packs & unpacks arguments,
+ Makes the wrapped function effectively threadsafe. If you call the function
+ from one thread, it will be executed in the thread the QObject has affinity
+ with.
+
+ Remember: only objects that inherit from QObject can support signals/slots
+
+ >>> class MyObject(QObject):
+ >>> @easycallback
+ >>> def mycallback(self):
+ >>> dostuff()
+ >>>
+ >>> myobject = MyObject()
+ >>>
+ >>> @task
+ >>> def mytask():
+ >>> myobject.mycallback()
+ >>>
+ >>> loop = QEventLoop()
+ >>> with loop:
+ >>> loop.call_soon(mytask)
+ >>> loop.run_forever()
+ """
+ signal = QtCore.pyqtSignal(object, tuple, dict)
+
+ def out_wrapper(self, args, kwargs):
+ return fn(self, *args, **kwargs)
+
+ @wraps(fn)
+ def in_wrapper(self, *args, **kwargs):
+ return signal.emit(self, args, kwargs)
+
+ signal.connect(out_wrapper)
+ return in_wrapper
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list