[Pkg-bitcoin-commits] [python-quamash] 113/269: QEventLoop: Implement add_reader and add_writer

Jonas Smedegaard dr at jones.dk
Fri Nov 24 11:26:22 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository python-quamash.

commit e4b7a600f9646b2aeba8be24146e106fe5a6a78c
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date:   Thu Jul 24 00:04:52 2014 +0200

    QEventLoop: Implement add_reader and add_writer
    
    Conflicts:
    	conftest.py
---
 conftest.py              |  1 +
 quamash/__init__.py      | 24 +++++++++++++
 quamash/_windows.py      | 16 +++++----
 tests/test_qeventloop.py | 89 ++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 123 insertions(+), 7 deletions(-)

diff --git a/conftest.py b/conftest.py
index bcebec0..75509de 100644
--- a/conftest.py
+++ b/conftest.py
@@ -5,3 +5,4 @@ sys.path.insert(0, os.path.dirname(__file__))
 logging.basicConfig(
 	level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
 
+
diff --git a/quamash/__init__.py b/quamash/__init__.py
index 6dcccf5..d92540f 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -229,6 +229,8 @@ class QEventLoop(_baseclass):
 		self.__debug_enabled = False
 		self.__default_executor = None
 		self.__exception_handler = None
+		self.__read_notifiers = {}
+		self.__write_notifiers = {}
 
 		assert self.__app is not None
 
@@ -323,6 +325,28 @@ class QEventLoop(_baseclass):
 		"""Get time according to event loop's clock."""
 		return time.monotonic()
 
+	def add_reader(self, fd, callback, *args):
+		"""Register a callback for when a file descriptor is ready for reading."""
+		notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read)
+		notifier.setEnabled(True)
+		self._logger.debug('Adding reader callback for file descriptor {}'.format(fd))
+		notifier.activated.connect(lambda: callback(*args))
+		self.__read_notifiers[fd] = notifier
+
+	def remove_reader(self, fd):
+		raise NotImplementedError
+
+	def add_writer(self, fd, callback, *args):
+		"""Register a callback for when a file descriptor is ready for writing."""
+		notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Write)
+		notifier.setEnabled(True)
+		self._logger.debug('Adding writer callback for file descriptor {}'.format(fd))
+		notifier.activated.connect(lambda: callback(*args))
+		self.__write_notifiers[fd] = notifier
+
+	def remove_writer(self, fd):
+		raise NotImplementedError
+
 	# Methods for interacting with threads.
 
 	@_easycallback
diff --git a/quamash/_windows.py b/quamash/_windows.py
index 95f49fc..9720eb8 100644
--- a/quamash/_windows.py
+++ b/quamash/_windows.py
@@ -16,6 +16,7 @@ from ._common import with_logger
 
 
 class _ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop):
+	"""Proactor based event loop."""
 	def __init__(self):
 		QtCore.QObject.__init__(self)
 		asyncio.ProactorEventLoop.__init__(self, _IocpProactor())
@@ -24,6 +25,7 @@ class _ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop):
 		self.__event_poller.sig_events.connect(self._process_events)
 
 	def _process_events(self, events):
+		"""Process events from proactor."""
 		for f, callback, transferred, key, ov in events:
 			try:
 				self._logger.debug('Invoking event callback {}'.format(callback))
@@ -35,7 +37,7 @@ class _ProactorEventLoop(QtCore.QObject, asyncio.ProactorEventLoop):
 				f.set_result(value)
 
 	def _before_run_forever(self):
-		self.__event_poller.start(self._selector)
+		self.__event_poller.start(self._proactor)
 
 	def _after_run_forever(self):
 		self.__event_poller.stop()
@@ -105,11 +107,11 @@ class _IocpProactor(windows_events.IocpProactor):
 
 @with_logger
 class _EventWorker(QtCore.QThread):
-	def __init__(self, selector, parent):
+	def __init__(self, proactor, parent):
 		super().__init__()
 
 		self.__stop = False
-		self.__selector = selector
+		self.__proactor = proactor
 		self.__sig_events = parent.sig_events
 		self.__semaphore = QtCore.QSemaphore()
 
@@ -127,7 +129,7 @@ class _EventWorker(QtCore.QThread):
 		self.__semaphore.release()
 
 		while not self.__stop:
-			events = self.__selector.select(0.01)
+			events = self.__proactor.select(0.01)
 			if events:
 				self._logger.debug('Got events from poll: {}'.format(events))
 				self.__sig_events.emit(events)
@@ -140,9 +142,9 @@ class _EventPoller(QtCore.QObject):
 	"""Polling of events in separate thread."""
 	sig_events = QtCore.Signal(list)
 
-	def start(self, selector):
-		self._logger.debug('Starting (selector: {})...'.format(selector))
-		self.__worker = _EventWorker(selector, self)
+	def start(self, proactor):
+		self._logger.debug('Starting (proactor: {})...'.format(proactor))
+		self.__worker = _EventWorker(proactor, self)
 		self.__worker.start()
 
 	def stop(self):
diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py
index d1c1c5f..472f71b 100644
--- a/tests/test_qeventloop.py
+++ b/tests/test_qeventloop.py
@@ -8,6 +8,7 @@ import sys
 import ctypes
 import multiprocessing
 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
+import socket
 
 from quamash import QtCore, QtGui
 
@@ -195,3 +196,91 @@ def test_get_set_debug(loop):
 	assert loop.get_debug()
 	loop.set_debug(False)
 	assert not loop.get_debug()
+
+
+def _create_sock_pair(port=0):
+	"""Create socket pair.
+
+	If socket.socketpair isn't available, we emulate it.
+	"""
+	# See if socketpair() is available.
+	have_socketpair = hasattr(socket, 'socketpair')
+	if have_socketpair:
+		client_sock, srv_sock = socket.socketpair()
+		return client_sock, srv_sock
+
+	# Create a non-blocking temporary server socket
+	temp_srv_sock = socket.socket()
+	temp_srv_sock.setblocking(False)
+	temp_srv_sock.bind(('', port))
+	port = temp_srv_sock.getsockname()[1]
+	temp_srv_sock.listen(1)
+
+	# Create non-blocking client socket
+	client_sock = socket.socket()
+	client_sock.setblocking(False)
+	try:
+		client_sock.connect(('localhost', port))
+	except socket.error as err:
+		# Error 10035 (operation would block) is not an error, as we're doing this with a
+		# non-blocking socket.
+		if err.errno != 10035:
+			raise
+
+	# Use select to wait for connect() to succeed.
+	import select
+	timeout = 1
+	readable = select.select([temp_srv_sock], [], [], timeout)[0]
+	if temp_srv_sock not in readable:
+		raise Exception('Client socket not connected in {} second(s)'.format(timeout))
+	srv_sock, _ = temp_srv_sock.accept()
+
+	return client_sock, srv_sock
+
+
+def test_can_add_reader(loop):
+	"""Verify that we can add a reader callback to an event loop."""
+	def can_read():
+		data = srv_sock.recv(1)
+		if len(data) != 1:
+			return
+
+		nonlocal got_msg
+		got_msg = data
+		# Indicate that we're done
+		fut.set_result(None)
+
+	def write():
+		client_sock.send(ref_msg)
+		client_sock.close()
+
+	ref_msg = b'a'
+	client_sock, srv_sock = _create_sock_pair()
+	try:
+		loop.call_soon(write)
+
+		got_msg = None
+		fut = asyncio.Future()
+		loop.add_reader(srv_sock.fileno(), can_read)
+		loop.run_until_complete(asyncio.wait_for(fut, timeout=1.0))
+	finally:
+		client_sock.close()
+		srv_sock.close()
+
+	assert got_msg == ref_msg
+
+
+def test_can_add_writer(loop):
+	"""Verify that we can add a writer callback to an event loop."""
+	def can_write():
+		# Indicate that we're done
+		fut.set_result(None)
+
+	client_sock, srv_sock = _create_sock_pair()
+	try:
+		fut = asyncio.Future()
+		loop.add_writer(client_sock.fileno(), can_write)
+		loop.run_until_complete(asyncio.wait_for(fut, timeout=1.0))
+	finally:
+		client_sock.close()
+		srv_sock.close()

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git



More information about the Pkg-bitcoin-commits mailing list