[Pkg-bitcoin-commits] [python-quamash] 113/269: QEventLoop: Implement add_reader and add_writer
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:22 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit e4b7a600f9646b2aeba8be24146e106fe5a6a78c
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date: Thu Jul 24 00:04:52 2014 +0200
QEventLoop: Implement add_reader and add_writer
Conflicts:
conftest.py
---
conftest.py | 1 +
quamash/__init__.py | 24 +++++++++++++
quamash/_windows.py | 16 +++++----
tests/test_qeventloop.py | 89 ++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 123 insertions(+), 7 deletions(-)
diff --git a/conftest.py b/conftest.py
index bcebec0..75509de 100644
--- a/conftest.py
+++ b/conftest.py
@@ -5,3 +5,4 @@ sys.path.insert(0, os.path.dirname(__file__))
logging.basicConfig(
level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
+
diff --git a/quamash/__init__.py b/quamash/__init__.py
index 6dcccf5..d92540f 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -229,6 +229,8 @@ class QEventLoop(_baseclass):
self.__debug_enabled = False
self.__default_executor = None
self.__exception_handler = None
+ self.__read_notifiers = {}
+ self.__write_notifiers = {}
assert self.__app is not None
@@ -323,6 +325,28 @@ class QEventLoop(_baseclass):
"""Get time according to event loop's clock."""
return time.monotonic()
+ def add_reader(self, fd, callback, *args):
+ """Register a callback for when a file descriptor is ready for reading."""
+ notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read)
+ notifier.setEnabled(True)
+ self._logger.debug('Adding reader callback for file descriptor {}'.format(fd))
+ notifier.activated.connect(lambda: callback(*args))
+ self.__read_notifiers[fd] = notifier
+
+ def remove_reader(self, fd):
+ raise NotImplementedError
+
+ def add_writer(self, fd, callback, *args):
+ """Register a callback for when a file descriptor is ready for writing."""
+ notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Write)
+ notifier.setEnabled(True)
+ self._logger.debug('Adding writer callback for file descriptor {}'.format(fd))
+ notifier.activated.connect(lambda: callback(*args))
+ self.__write_notifiers[fd] = notifier
+
+ def remove_writer(self, fd):
+ raise NotImplementedError
+
# Methods for interacting with threads.
@_easycallback
diff --git a/quamash/_windows.py b/quamash/_windows.py
index 95f49fc..9720eb8 100644
--- a/quamash/_windows.py
+++ b/quamash/_windows.py
@@ -16,6 +16,7 @@ from ._common import with_logger
class _ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop):
+ """Proactor based event loop."""
def __init__(self):
QtCore.QObject.__init__(self)
asyncio.ProactorEventLoop.__init__(self, _IocpProactor())
@@ -24,6 +25,7 @@ class _ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop):
self.__event_poller.sig_events.connect(self._process_events)
def _process_events(self, events):
+ """Process events from proactor."""
for f, callback, transferred, key, ov in events:
try:
self._logger.debug('Invoking event callback {}'.format(callback))
@@ -35,7 +37,7 @@ class _ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop):
f.set_result(value)
def _before_run_forever(self):
- self.__event_poller.start(self._selector)
+ self.__event_poller.start(self._proactor)
def _after_run_forever(self):
self.__event_poller.stop()
@@ -105,11 +107,11 @@ class _IocpProactor(windows_events.IocpProactor):
@with_logger
class _EventWorker(QtCore.QThread):
- def __init__(self, selector, parent):
+ def __init__(self, proactor, parent):
super().__init__()
self.__stop = False
- self.__selector = selector
+ self.__proactor = proactor
self.__sig_events = parent.sig_events
self.__semaphore = QtCore.QSemaphore()
@@ -127,7 +129,7 @@ class _EventWorker(QtCore.QThread):
self.__semaphore.release()
while not self.__stop:
- events = self.__selector.select(0.01)
+ events = self.__proactor.select(0.01)
if events:
self._logger.debug('Got events from poll: {}'.format(events))
self.__sig_events.emit(events)
@@ -140,9 +142,9 @@ class _EventPoller(QtCore.QObject):
"""Polling of events in separate thread."""
sig_events = QtCore.Signal(list)
- def start(self, selector):
- self._logger.debug('Starting (selector: {})...'.format(selector))
- self.__worker = _EventWorker(selector, self)
+ def start(self, proactor):
+ self._logger.debug('Starting (proactor: {})...'.format(proactor))
+ self.__worker = _EventWorker(proactor, self)
self.__worker.start()
def stop(self):
diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py
index d1c1c5f..472f71b 100644
--- a/tests/test_qeventloop.py
+++ b/tests/test_qeventloop.py
@@ -8,6 +8,7 @@ import sys
import ctypes
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
+import socket
from quamash import QtCore, QtGui
@@ -195,3 +196,91 @@ def test_get_set_debug(loop):
assert loop.get_debug()
loop.set_debug(False)
assert not loop.get_debug()
+
+
+def _create_sock_pair(port=0):
+ """Create socket pair.
+
+ If socket.socketpair isn't available, we emulate it.
+ """
+ # See if socketpair() is available.
+ have_socketpair = hasattr(socket, 'socketpair')
+ if have_socketpair:
+ client_sock, srv_sock = socket.socketpair()
+ return client_sock, srv_sock
+
+ # Create a non-blocking temporary server socket
+ temp_srv_sock = socket.socket()
+ temp_srv_sock.setblocking(False)
+ temp_srv_sock.bind(('', port))
+ port = temp_srv_sock.getsockname()[1]
+ temp_srv_sock.listen(1)
+
+ # Create non-blocking client socket
+ client_sock = socket.socket()
+ client_sock.setblocking(False)
+ try:
+ client_sock.connect(('localhost', port))
+ except socket.error as err:
+ # Error 10035 (operation would block) is not an error, as we're doing this with a
+ # non-blocking socket.
+ if err.errno != 10035:
+ raise
+
+ # Use select to wait for connect() to succeed.
+ import select
+ timeout = 1
+ readable = select.select([temp_srv_sock], [], [], timeout)[0]
+ if temp_srv_sock not in readable:
+ raise Exception('Client socket not connected in {} second(s)'.format(timeout))
+ srv_sock, _ = temp_srv_sock.accept()
+
+ return client_sock, srv_sock
+
+
+def test_can_add_reader(loop):
+ """Verify that we can add a reader callback to an event loop."""
+ def can_read():
+ data = srv_sock.recv(1)
+ if len(data) != 1:
+ return
+
+ nonlocal got_msg
+ got_msg = data
+ # Indicate that we're done
+ fut.set_result(None)
+
+ def write():
+ client_sock.send(ref_msg)
+ client_sock.close()
+
+ ref_msg = b'a'
+ client_sock, srv_sock = _create_sock_pair()
+ try:
+ loop.call_soon(write)
+
+ got_msg = None
+ fut = asyncio.Future()
+ loop.add_reader(srv_sock.fileno(), can_read)
+ loop.run_until_complete(asyncio.wait_for(fut, timeout=1.0))
+ finally:
+ client_sock.close()
+ srv_sock.close()
+
+ assert got_msg == ref_msg
+
+
+def test_can_add_writer(loop):
+ """Verify that we can add a writer callback to an event loop."""
+ def can_write():
+ # Indicate that we're done
+ fut.set_result(None)
+
+ client_sock, srv_sock = _create_sock_pair()
+ try:
+ fut = asyncio.Future()
+ loop.add_writer(client_sock.fileno(), can_write)
+ loop.run_until_complete(asyncio.wait_for(fut, timeout=1.0))
+ finally:
+ client_sock.close()
+ srv_sock.close()
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list