[Pkg-bitcoin-commits] [python-quamash] 76/269: Fixes to tests, mostly doctests.

Jonas Smedegaard dr at jones.dk
Fri Nov 24 11:26:18 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository python-quamash.

commit 672b8f2a5e024897470b426a642f813ddc42f796
Author: Mark Harviston <mark.harviston at gmail.com>
Date:   Thu Jul 10 21:18:39 2014 -0700

    Fixes to tests, mostly doctests.
    
    doctests now actually work, before they did not.
    
    test_qeventloop, removed totally superfluous class, replaced with top-level testing functions (personal preference)
---
 pytest.ini               |  2 ++
 quamash/__init__.py      | 66 ++++++++++++++++++++++++++-------------
 tests/test_qeventloop.py | 81 ++++++++++++++++++++++++------------------------
 3 files changed, 87 insertions(+), 62 deletions(-)

diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 0000000..e38c40a
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,2 @@
+[pytest]
+addopts=--doctest-modules quamash tests
diff --git a/quamash/__init__.py b/quamash/__init__.py
index 89db1e5..ef30d0e 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -72,11 +72,12 @@ class QThreadExecutor(QtCore.QObject):
 	"""
 	ThreadExecutor that produces QThreads
 	Same API as `concurrent.futures.Executor`
-
+	
+	>>> from quamash import QThreadExecutor
 	>>> with QThreadExecutor(5) as executor:
-	>>>     f = executor.submit(lambda x: 2 + x, x)
-	>>>     r = f.result()
-	>>>     assert r == 4
+	...     f = executor.submit(lambda x: 2 + x, 2)
+	...     r = f.result()
+	...     assert r == 4
 	"""
 	def __init__(self, max_workers=10, parent=None):
 		super().__init__(parent)
@@ -121,21 +122,38 @@ def _easycallback(fn):
 
 	Remember: only objects that inherit from QObject can support signals/slots
 
+	>>> try: from PyQt5.QtCore import QThread, QObject
+	... except ImportError: from PySide.QtCore import QThread, QObject
+	>>>
+	>>> try: from PyQt5.QtWidgets import QApplication
+	... except ImportError: from PySide.QtGui import QApplication
+	>>>
+	>>> app = QApplication([])
+	>>>
+	>>> from quamash import QEventLoop, _easycallback
+	>>> import asyncio
+	>>>
+	>>> global_thread = QThread.currentThread()
 	>>> class MyObject(QObject):
-	>>>     @_easycallback
-	>>>     def mycallback(self):
-	>>>         dostuff()
+	...     @_easycallback
+	...     def mycallback(self):
+	...         global global_thread, mythread
+	...         cur_thread = QThread.currentThread()
+	...         assert cur_thread is not global_thread
+	...         assert cur_thread is mythread
 	>>>
+	>>> mythread = QThread()
+	>>> mythread.start()
 	>>> myobject = MyObject()
+	>>> myobject.moveToThread(mythread)
 	>>>
-	>>> @task
-	>>> def mytask():
-	>>>     myobject.mycallback()
+	>>> @asyncio.coroutine
+	... def mycoroutine():
+	...     myobject.mycallback()
 	>>>
-	>>> loop = QEventLoop()
+	>>> loop = QEventLoop(app)
 	>>> with loop:
-	>>>     loop.call_soon(mytask)
-	>>>     loop.run_forever()
+	...     loop.run_until_complete(mycoroutine())
 	"""
 	@wraps(fn)
 	def in_wrapper(self, *args, **kwargs):
@@ -161,15 +179,20 @@ else:
 class QEventLoop(_baseclass):
 	"""
 	Implementation of asyncio event loop that uses the Qt Event loop
-	>>> @quamash.task
-	>>> def my_task(x):
-	>>>     return x + 2
+	>>> import quamash, asyncio
+	>>> try: from PyQt5.QtWidgets import QApplication
+	... except ImportError: from PySide.QtCore import QApplication
 	>>>
-	>>> app = QApplication()
-	>>> with QEventLoop(app) as loop:
-	>>>     y = loop.call_soon(my_task)
+	>>> app = QApplication([])
 	>>>
-	>>>     assert y == 4
+	>>> @asyncio.coroutine
+	... def xplusy(x, y):
+	...     yield from asyncio.sleep(2)
+	...     assert x + y == 4
+	...     yield from asyncio.sleep(2)
+	>>> 
+	>>> with QEventLoop(app) as loop:
+	...     loop.run_until_complete(xplusy(2,2))
 	"""
 	def __init__(self, app):
 		self.__timers = []
@@ -396,6 +419,8 @@ class QEventLoop(_baseclass):
 		except:
 			sys.stderr.write('{}, {}\n'.format(args, kwds))
 
+	def remove(self, timer):
+		self.__timers.remove(timer)
 
 class _Cancellable:
 	def __init__(self, timer, loop):
@@ -403,5 +428,4 @@ class _Cancellable:
 		self.__loop = loop
 
 	def cancel(self):
-		self.__loop.remove(timer)
 		self.__timer.stop()
diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py
index f635bb1..f5db5a2 100644
--- a/tests/test_qeventloop.py
+++ b/tests/test_qeventloop.py
@@ -52,45 +52,44 @@ def loop(request, application):
 	return lp
 
 
-class TestQEventLoop:
-	def test_can_run_tasks_in_default_executor(self, loop):
-		"""Verify that tasks can be run in default (threaded) executor."""
-		def blocking_func():
-			nonlocal was_invoked
-			was_invoked = True
-
-		@asyncio.coroutine
-		def blocking_task():
-			yield from loop.run_in_executor(None, blocking_func)
-
-		was_invoked = False
-		loop.run_until_complete(blocking_task())
-
-		assert was_invoked
-
-	def test_can_execute_subprocess(self, loop):
-		"""Verify that a subprocess can be executed."""
-		transport, protocol = loop.run_until_complete(loop.subprocess_exec(
-			_SubprocessProtocol, 'python', '-c', 'print(\'Hello async world!\')'))
+def test_can_run_tasks_in_default_executor(loop):
+	"""Verify that tasks can be run in default (threaded) executor."""
+	def blocking_func():
+		nonlocal was_invoked
+		was_invoked = True
+
+	@asyncio.coroutine
+	def blocking_task():
+		yield from loop.run_in_executor(None, blocking_func)
+
+	was_invoked = False
+	loop.run_until_complete(blocking_task())
+
+	assert was_invoked
+
+def test_can_execute_subprocess(loop):
+	"""Verify that a subprocess can be executed."""
+	transport, protocol = loop.run_until_complete(loop.subprocess_exec(
+		_SubprocessProtocol, sys.executable or 'python', '-c', 'print(\'Hello async world!\')'))
+	loop.run_forever()
+	assert transport.get_returncode() == 0
+	assert protocol.received_stdout == 'Hello async world!'
+
+def test_can_terminate_subprocess(loop):
+	"""Verify that a subprocess can be terminated."""
+	# Start a never-ending process
+	transport = loop.run_until_complete(loop.subprocess_exec(
+			_SubprocessProtocol, sys.executable or 'python', '-c', 'import time\nwhile True: time.sleep(1)'))[0]
+	# Terminate!
+	transport.kill()
+	# Wait for process to die
+	loop.run_forever()
+
+	assert transport.get_returncode() != 0
+
+def test_can_function_as_context_manager(application):
+	"""Verify that a QEventLoop can function as its own context manager."""
+	with quamash.QEventLoop(application) as loop:
+		assert isinstance(loop, quamash.QEventLoop)
+		loop.call_soon(loop.stop)
 		loop.run_forever()
-		assert transport.get_returncode() == 0
-		assert protocol.received_stdout == 'Hello async world!'
-
-	def test_can_terminate_subprocess(self, loop):
-		"""Verify that a subprocess can be terminated."""
-		# Start a never-ending process
-		transport = loop.run_until_complete(loop.subprocess_exec(
-				_SubprocessProtocol, 'python', '-c', 'import time\nwhile True: time.sleep(1)'))[0]
-		# Terminate!
-		transport.kill()
-		# Wait for process to die
-		loop.run_forever()
-
-		assert transport.get_returncode() != 0
-
-	def test_can_function_as_context_manager(self, application):
-		"""Verify that a QEventLoop can function as its own context manager."""
-		with quamash.QEventLoop(application) as loop:
-			assert isinstance(loop, quamash.QEventLoop)
-			loop.call_soon(loop.stop)
-			loop.run_forever()

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git



More information about the Pkg-bitcoin-commits mailing list