[Pkg-bitcoin-commits] [python-quamash] 43/269: Add test for killing of subprocess
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:14 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit c25477a30624c5c05699084bec968d9d3819ca1e
Author: Arve Knudsen <arve.knudsen at gmail.com>
Date: Thu Jul 3 14:50:36 2014 +0200
Add test for killing of subprocess
---
tests/test_qeventloop.py | 172 +++++++++++++++++++++++++----------------------
1 file changed, 93 insertions(+), 79 deletions(-)
diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py
index 2c47a6e..0813088 100644
--- a/tests/test_qeventloop.py
+++ b/tests/test_qeventloop.py
@@ -1,79 +1,93 @@
-import asyncio
-import os.path
-import logging
-import sys
-import locale
-try:
- from PyQt5.QtWidgets import QApplication
-except ImportError:
- from PySide.QtGui import QApplication
-import pytest
-
-sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
-import quamash
-
-
-logging.basicConfig(level=logging.DEBUG,
- format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
-
-
-class _SubprocessProtocol(asyncio.SubprocessProtocol):
- def __init__(self, *args, **kwds):
- super(_SubprocessProtocol, self).__init__(*args, **kwds)
- self.received_stdout = None
-
- def pipe_data_received(self, fd, data):
- text = data.decode(locale.getpreferredencoding(False))
- if fd == 1:
- self.received_stdout = text.strip()
-
- def process_exited(self):
- asyncio.get_event_loop().stop()
-
-
- at pytest.fixture
-def loop(request):
- app = QApplication([])
- lp = quamash.QEventLoop(app)
- asyncio.set_event_loop(lp)
-
- def fin():
- try:
- lp.close()
- finally:
- asyncio.set_event_loop(None)
-
- request.addfinalizer(fin)
- return lp
-
-
-class TestQEventLoop:
- def test_can_run_tasks_in_default_executor(self, loop):
- """Verify that tasks can be run in default (threaded) executor."""
- def blocking_func():
- nonlocal was_invoked
- was_invoked = True
-
- @asyncio.coroutine
- def blocking_task():
- yield from loop.run_in_executor(None, blocking_func)
-
- was_invoked = False
- loop.run_until_complete(blocking_task())
-
- assert was_invoked
-
- def test_can_execute_subprocess(self, loop):
- transport, protocol = loop.run_until_complete(loop.subprocess_exec(
- _SubprocessProtocol, 'python', '-c', 'print(\'Hello async world!\')'))
- loop.run_forever()
- assert transport.get_returncode() == 0
- assert protocol.received_stdout == 'Hello async world!'
-
- def test_can_function_as_context_manager(self):
- app = QApplication([])
-
- with quamash.QEventLoop(app) as loop:
- assert isinstance(loop, quamash.QEventLoop)
- loop.call_soon(loop.stop)
- loop.run_forever()
+import asyncio
+import os.path
+import logging
+import sys
+import locale
+try:
+ from PyQt5.QtWidgets import QApplication
+except ImportError:
+ from PySide.QtGui import QApplication
+import pytest
+
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+import quamash
+
+
+logging.basicConfig(level=logging.DEBUG,
+ format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
+
+
+class _SubprocessProtocol(asyncio.SubprocessProtocol):
+ def __init__(self, *args, **kwds):
+ super(_SubprocessProtocol, self).__init__(*args, **kwds)
+ self.received_stdout = None
+
+ def pipe_data_received(self, fd, data):
+ text = data.decode(locale.getpreferredencoding(False))
+ if fd == 1:
+ self.received_stdout = text.strip()
+
+ def process_exited(self):
+ asyncio.get_event_loop().stop()
+
+
+ at pytest.fixture
+def loop(request):
+ app = QApplication([])
+ lp = quamash.QEventLoop(app)
+ asyncio.set_event_loop(lp)
+
+ def fin():
+ try:
+ lp.close()
+ finally:
+ asyncio.set_event_loop(None)
+
+ request.addfinalizer(fin)
+ return lp
+
+
+class TestQEventLoop:
+ def test_can_run_tasks_in_default_executor(self, loop):
+ """Verify that tasks can be run in default (threaded) executor."""
+ def blocking_func():
+ nonlocal was_invoked
+ was_invoked = True
+
+ @asyncio.coroutine
+ def blocking_task():
+ yield from loop.run_in_executor(None, blocking_func)
+
+ was_invoked = False
+ loop.run_until_complete(blocking_task())
+
+ assert was_invoked
+
+ def test_can_execute_subprocess(self, loop):
+ """Verify that a subprocess can be executed."""
+ transport, protocol = loop.run_until_complete(loop.subprocess_exec(
+ _SubprocessProtocol, 'python', '-c', 'print(\'Hello async world!\')'))
+ loop.run_forever()
+ assert transport.get_returncode() == 0
+ assert protocol.received_stdout == 'Hello async world!'
+
+ def test_can_terminate_subprocess(self, loop):
+ """Verify that a subprocess can be terminated."""
+ # Start a never-ending process
+ transport = loop.run_until_complete(loop.subprocess_exec(
+ _SubprocessProtocol, 'python', '-c', 'import time\nwhile True: time.sleep(1)'))[0]
+ # Terminate!
+ transport.kill()
+ # Wait for process to die
+ loop.run_forever()
+
+ assert transport.get_returncode() != 0
+
+ def test_can_function_as_context_manager(self):
+ """Verify that a QEventLoop can function as its own context manager."""
+ app = QApplication([])
+
+ with quamash.QEventLoop(app) as loop:
+ assert isinstance(loop, quamash.QEventLoop)
+ loop.call_soon(loop.stop)
+ loop.run_forever()
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list