[Pkg-bitcoin-commits] [python-quamash] 01/269: initial commit

Jonas Smedegaard dr at jones.dk
Fri Nov 24 11:26:10 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository python-quamash.

commit f83a9865f339fe529276646571767c2293e158fe
Author: Mark Harviston <mark.harviston at gmail.com>
Date:   Mon Jun 10 15:28:27 2013 -0700

    initial commit
---
 README       |  47 +++++++++++
 README.rst   |   1 +
 guievents.py | 253 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 quamash.py   | 199 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 500 insertions(+)

diff --git a/README b/README
new file mode 100644
index 0000000..d09a636
--- /dev/null
+++ b/README
@@ -0,0 +1,47 @@
+=======
+Quamash
+=======
+Tulip API for the Qt Event-Loop
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+:author: Mark Harviston <mark.harviston at gmail.com>
+
+Usage
+=====
+
+import quamash
+import tulip
+import time
+
+from PyQt4 import QtCore, QtGui
+# - or - #
+from PySide import QtCore, QtGui
+
+def identity(x):
+	time.sleep(10)
+	return x
+
+ at quamash.task
+def my_task(loop, executor):
+	for x in range(5):
+		y = yield from loop.run_in_executor(executor, identity, x)
+		assert x == y
+
+if __name__ == '__main__':
+	app = QApplication
+	loop = quamash.QEventLoop(app)
+
+	win = QtGui.QMainWindow()
+	win.show()
+
+	with loop:
+		
+
+
+Name
+====
+Tulip related projects are being named after other flowers, Quamash starts with a "Q".
+
+License
+=======
+BSD 3 Clause License
+
diff --git a/README.rst b/README.rst
new file mode 120000
index 0000000..100b938
--- /dev/null
+++ b/README.rst
@@ -0,0 +1 @@
+README
\ No newline at end of file
diff --git a/guievents.py b/guievents.py
new file mode 100644
index 0000000..c3fe2c5
--- /dev/null
+++ b/guievents.py
@@ -0,0 +1,253 @@
+#!/usr/bin/python3
+"""
+Adapted from work at microsoft to work with the tk event loop
+
+"""
+from tulip.events import AbstractEventLoop
+from tulip import events
+import tulip
+import concurrent
+import threading
+import logging
+from tulip import futures
+
+_MAX_WORKERS = 10
+
+
+class GuiEventLoop(AbstractEventLoop):
+	# Methods returning Futures for interacting with threads.
+	def __init__(self):
+		pass
+		self._start_io_event_loop()
+
+	def _start_io_event_loop(self):
+		"""Starts the I/O event loop which we defer to for doing I/O running on
+		another thread"""
+		self._event_loop_started = threading.Lock()
+		self._event_loop_started.acquire()
+		threading.Thread(None, self._io_event_loop_thread).start()
+		self._event_loop_started.acquire()
+
+	def _io_event_loop_thread(self):
+		"""Worker thread for running the I/O event loop"""
+		io_event_loop = tulip.get_event_loop_policy().new_event_loop()
+		tulip.set_event_loop(io_event_loop)
+		assert isinstance(io_event_loop, AbstractEventLoop)
+		self._io_event_loop = io_event_loop
+		self._event_loop_started.release()
+		self._io_event_loop.run_forever()
+
+	def stop(self):
+		self._io_event_loop.stop()
+
+	def wrap_future(self, future):
+		if isinstance(future, futures.Future):
+			return future  # Don't wrap our own type of Future.
+		new_future = futures.Future()
+		future.add_done_callback(
+			lambda future:
+			self.call_soon_threadsafe(new_future._copy_state, future))
+		return new_future
+
+	def run_until_complete(self, future, timeout=None):  # NEW!
+		"""Run the event loop until a Future is done.
+
+		Return the Future's result, or raise its exception.
+
+		If timeout is not None, run it for at most that long;
+		if the Future is still not done, raise TimeoutError
+		(but don't cancel the Future).
+		"""
+		assert isinstance(future, futures.Future)
+
+		start_time = self.time()
+		while not future.done():
+			if (timeout is None) or ((self.time() - start_time) < timeout):
+				self.run_once()
+			else:
+				raise tulip.TimeoutError
+
+		return future.result()
+
+	def call_repeatedly(self, interval, callback, *args):  # NEW!
+		return _CancelJobRepeatedly(self, interval, callback, args)
+
+	def call_soon(self, callback, *args):
+		return self.call_later(0, callback, *args)
+
+	def call_soon_threadsafe(self, callback, *args):
+		return self.call_soon(callback, *args)
+
+	def run_in_executor(self, executor, callback, *args):
+		if isinstance(callback, events.Handle):
+			assert not args
+			assert not isinstance(callback, events.TimerHandle)
+			if callback.cancelled:
+				f = futures.Future()
+				f.set_result(None)
+				return f
+			callback, args = callback.callback, callback.args
+
+		if executor is None:
+			executor = self._default_executor
+			if executor is None:
+				executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
+				self._default_executor = executor
+		return self.wrap_future(executor.submit(callback, *args))
+
+	def _io_helper(self, target, args, kwargs):
+		lock = threading.Lock()
+		lock.acquire()
+		res = None
+
+		def helper_target():
+			nonlocal res
+			res = target(*args, **kwargs)
+			lock.release()
+
+		handler = self._io_event_loop.call_soon_threadsafe(helper_target)
+		lock.acquire()
+		return res
+
+	# Network I/O methods returning Futures.
+	def getaddrinfo(
+		self, host, port, *,
+		family=0, type=0, proto=0, flags=0):
+
+		return self._io_helper(
+			self._io_event_loop.getaddrinfo,
+			(host, port),
+			{
+				'family': family, 'type': type,
+				'proto': proto, 'flags': flags,
+			}
+		)
+
+	def getnameinfo(self, sockaddr, flags=0):
+		return self._io_helper(
+			self._io_event_loop.getnameinfo,
+			(sockaddr, flags),
+			{}
+		)
+
+	def create_connection(
+		self, protocol_factory, host=None, port=None, *,
+		family=0, proto=0, flags=0, sock=None):
+
+		return self._io_helper(
+			self._io_event_loop.create_connection,
+			(protocol_factory, host, port),
+			{
+				'family': family, 'proto': proto,
+				'flags': flags, 'sock': sock
+			}
+		)
+
+	def start_serving(
+		self, protocol_factory, host=None, port=None, *,
+		family=0, proto=0, flags=0, sock=None):
+
+		return self._io_helper(
+			self._io_event_loop.start_serving,
+			(protocol_factory, host, port),
+			{
+				'family': family, 'proto': proto,
+				'flags': flags, 'sock': sock
+			},
+		)
+
+	# Ready-based callback registration methods.
+	# The add_*() methods return a Handler.
+	# The remove_*() methods return True if something was removed,
+	# False if there was nothing to delete.
+
+	def _handler_helper(self, target, *args):
+		lock = threading.Lock()
+		lock.acquire()
+		handler = None
+
+		def helper_target():
+			nonlocal handler
+			handler = target(*args)
+			lock.release()
+
+		self._io_event_loop.call_soon_threadsafe(helper_target)
+		lock.acquire()
+		return handler
+
+	def add_reader(self, fd, callback, *args):
+		return _ready_helper(self._io_event_loop.add_reader, fd, callback, *args)
+
+	def remove_reader(self, fd):
+		return _ready_helper(self._io_event_loop.remove_reader, fd)
+
+	def add_writer(self, fd, callback, *args):
+		return _ready_helper(self._io_event_loop.add_writer, fd, callback, *args)
+
+	def remove_writer(self, fd):
+		return _ready_helper(self._io_event_loop.remove_writer, fd)
+
+	def connect_read_pipe(self, protocol_factory, pipe):
+		raise NotImplemented  # FIXME
+
+	def connect_write_pipe(self, protocol_factory, pipe):
+		raise NotImplemented  # FIXME
+
+	# Completion based I/O methods returning Futures.
+
+	def sock_recv(self, sock, nbytes):
+		return self._io_helper(
+			self._io_event_loop.sock_recv,
+			(sock, nbytes),
+			{},
+		)
+
+	def sock_sendall(self, sock, data):
+		return self._io_helper(
+			self._io_event_loop.sock_sendall,
+			(sock, data),
+			{}
+		)
+
+	def sock_connect(self, sock, address):
+		return self._io_helper(
+			self._io_event_loop.sock_connect,
+			(sock, address),
+			{},
+		)
+		return self.run_in_executor(None, sock.connect, address)
+
+	def sock_accept(self, sock):
+		return self._io_helper(
+			self._io_event_loop.sock_accept,
+			(sock, ),
+			{}
+		)
+
+	# Signal handling.
+
+	def add_signal_handler(self, sig, callback, *args):
+		return self._handler_helper(self.add_signal_handler, sig, callback, *args)
+
+	def remove_signal_handler(self, sig):
+		return self._handler_helper(self.remove_signal_handler, sig)
+
+
+class _CancelJobRepeatedly(object):
+	"""Object that allows cancelling of a call_repeatedly"""
+	def __init__(self, event_loop, delay, callback, args):
+		self.event_loop = event_loop
+		self.delay = delay
+		self.callback = callback
+		self.args = args
+		self.post()
+
+	def invoke(self):
+		self.callback(*self.args)
+		self.post()
+
+	def post(self):
+		self.canceler = self.event_loop.call_later(self.delay, self.invoke)
+
+	def cancel(self):
+		self.canceler.cancel()
diff --git a/quamash.py b/quamash.py
new file mode 100644
index 0000000..e0b203a
--- /dev/null
+++ b/quamash.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python3
+# -*- coding=utf-8 -*- #
+# © 2013 Mark Harviston <mark.harviston at gmail.com>
+# BSD License
+"""
+Tulip hooked up to the Qt Event Loop several utilities
+"""
+__author__ = 'Mark Harviston <mark.harviston at gmail.com>
+__version__ = '0.1'
+
+import tulip as async
+
+import time
+from functools import partial, wraps
+import sys
+import logging  # noqa
+import requests
+import feedparser
+from queue import Queue
+from concurrent.futures import Future
+
+from guievents import GuiEventLoop
+
+try:
+	from PySide import QtGui, QtCore
+except ImportError:
+	from PyQt4 import QtGui, QtCore
+
+
+class QThreadWorker(QtCore.QThread):
+	"""
+	Read from the queue.
+
+	For use by the QThreadExecutor
+	"""
+	def __init__(self, queue):
+		self.queue = queue
+		self.STOP = False
+		super().__init__()
+
+	def run(self):
+		while not self.STOP:
+			future, fn, args, kwargs = self.queue.get()
+			if future.set_running_or_notify_cancel():
+				r = fn(*args, **kwargs)
+				future.set_result(r)
+
+	def stop(self):
+		self.STOP = True
+
+
+class QThreadExecutor(QtCore.QObject):
+	"""
+	ThreadExecutor that produces QThreads
+	Same API as `concurrent.futures.Executor`
+
+	>>> with QThreadExecutor(5) as executor:
+	>>>     f = executor.submit(lambda x: 2 + x, x)
+	>>>     r = f.result()
+	>>>     assert r == 4
+	"""
+	def __init__(self, max_workers, parent=None):
+		super().__init__(parent)
+		self.max_workers = max_workers
+		self.queue = Queue()
+		self.workers = [QThreadWorker(self.queue, loop) for i in range(max_workers)]
+		for w in self.workers:
+			w.start()
+
+	def submit(self, fn, *args, **kwargs):
+		future = Future()
+		self.queue.put((future, fn, args, kwargs))
+		return future
+
+	def map(self, func, *iterables, timeout=None):
+		raise NotImplemented("use as_completed on the event loop")
+
+	def shutdown(self, wait=True):
+		map(lambda w: w.stop(), self.workers)
+
+	def __enter__(self, *args):
+		pass
+
+	def __exit__(self, *args):
+		self.shutdown()
+
+
+class QEventLoop(QtCore.QObject, GuiEventLoop):
+	"""
+	Implementation of tulip event loop that uses the Qt Event loop
+	>>> @qumash.task
+	>>> def my_task(x):
+	>>>     return x + 2
+	>>>
+	>>> app = QApplication()
+	>>> with QEventLoop(app) as loop:
+	>>>     y = loop.call_soon(my_task)
+	>>>
+	>>>     assert y == 4
+	"""
+
+	def __init__(self, app=None):
+		super().__init__()
+		self.timers = []
+
+	# Event Loop API
+	def run(self):
+		"""Run the event loop.  Block until there is nothing left to do."""
+		return self.run_forever()
+
+	def __enter__(self):
+		async.set_event_loop(self)
+
+	def __exit__(self, *args):
+		self.stop()
+		async.set_event_loop(None)
+
+	def close(self):
+		self.stop()
+		self.timers = []
+		self.app = None
+
+	@easycallback
+	def call_soon_threadsafe(self, fn, *args):
+		self.call_soon(fn, *args)
+
+	def _create_timer(self, delay, fn, *args, singleshot):
+		timer = QtCore.QTimer(self.app)
+		timer.timeout.connect(partial(fn, *args))
+		if singleshot:
+			timer.timeout.connect(lambda: self.timers.remove(timer))
+		timer.setSingleShot(singleshot)
+		timer.start(delay * 1000)
+
+		self.timers.append(timer)
+
+		return Cancellable(timer)
+
+	def call_later(self, delay, fn, *args):
+		self._create_timer(delay, fn, *args, singleshot=True)
+
+	def call_at(self, at, fn, *args):
+		self.call_later(at - self.time(), fn, *args)
+
+	def time(self):
+		return time.monotonic()
+
+	def run_forever(self):
+		return self.app.exec_()
+
+	def stop(self):
+		super().stop()
+		self.app.exit()
+
+class Cancellable(object):
+	def __init__(self, timer, loop):
+		self.timer = timer
+		self.loop = loop
+
+	def cancel(self):
+		self.loop.remove(timer)
+		return self.timer.stop()
+
+def easycallback(fn):
+	"""
+	Decorator that wraps a callback in a signal, and packs & unpacks arguments,
+	Makes the wrapped function effectively threadsafe. If you call the function
+	from one thread, it will be executed in the thread the QObject has affinity
+	with.
+
+	Remember: only objects that inherit from QObject can support signals/slots
+
+	>>> class MyObject(QObject):
+	>>>     @easycallback
+	>>>     def mycallback(self):
+	>>>         dostuff()
+	>>>
+	>>> myobject = MyObject()
+	>>>
+	>>> @task
+	>>> def mytask():
+	>>>     myobject.mycallback()
+	>>>
+	>>> loop = QEventLoop()
+	>>> with loop:
+	>>>     loop.call_soon(mytask)
+	>>>     loop.run_forever()
+	"""
+	signal = QtCore.pyqtSignal(object, tuple, dict)
+
+	def out_wrapper(self, args, kwargs):
+		return fn(self, *args, **kwargs)
+
+	@wraps(fn)
+	def in_wrapper(self, *args, **kwargs):
+		return signal.emit(self, args, kwargs)
+
+	signal.connect(out_wrapper)
+	return in_wrapper

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git



More information about the Pkg-bitcoin-commits mailing list