[pytango] 116/122: Add unittests for tango events

Sandor Bodo-Merle sbodomerle-guest at moszumanska.debian.org
Thu Sep 28 19:18:24 UTC 2017


This is an automated email from the git hooks/post-receive script.

sbodomerle-guest pushed a commit to tag v9.2.1
in repository pytango.

commit 2f2ec27466f56efad5896d7093bc5ae6fff066bf
Author: Vincent Michel <vincent.michel at maxlab.lu.se>
Date:   Wed Jan 18 18:53:29 2017 +0100

    Add unittests for tango events
---
 tests/test_event.py | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 84 insertions(+)

diff --git a/tests/test_event.py b/tests/test_event.py
new file mode 100644
index 0000000..ff2abff
--- /dev/null
+++ b/tests/test_event.py
@@ -0,0 +1,84 @@
+
+# Imports
+
+import socket
+from functools import partial
+
+import pytest
+
+from tango import EventType, GreenMode, DeviceProxy
+from tango.server import Device
+from tango.server import command, attribute
+from tango.test_utils import DeviceTestContext
+
+from tango.gevent import DeviceProxy as gevent_DeviceProxy
+from tango.futures import DeviceProxy as futures_DeviceProxy
+from tango.asyncio import DeviceProxy as asyncio_DeviceProxy
+
+# Helpers
+
+device_proxy_map = {
+    GreenMode.Synchronous: DeviceProxy,
+    GreenMode.Futures: futures_DeviceProxy,
+    GreenMode.Asyncio: partial(asyncio_DeviceProxy, wait=True),
+    GreenMode.Gevent: gevent_DeviceProxy}
+
+
+def get_open_port():
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    s.bind(("", 0))
+    s.listen(1)
+    port = s.getsockname()[1]
+    s.close()
+    return port
+
+
+# Test device
+
+class EventDevice(Device):
+
+    def init_device(self):
+        self.set_change_event("attr", True, False)
+
+    @attribute
+    def attr(self):
+        return 0.
+
+    @command
+    def send_event(self):
+        self.push_change_event("attr", 1.)
+
+
+# Device fixture
+
+ at pytest.fixture(params=[GreenMode.Synchronous,
+                        GreenMode.Futures,
+                        GreenMode.Asyncio,
+                        GreenMode.Gevent],
+                scope="module")
+def event_device(request):
+    green_mode = request.param
+    # Hack: a port have to be specified explicitely for events to work
+    port = get_open_port()
+    context = DeviceTestContext(EventDevice, port=port, process=True)
+    with context:
+        yield device_proxy_map[green_mode](context.get_device_access())
+
+
+# Tests
+
+def test_subscribe_event(event_device):
+    results = []
+
+    def callback(evt):
+        results.append(evt.attr_value.value)
+
+    event_device.subscribe_event(
+        "attr", EventType.CHANGE_EVENT, callback, wait=True)
+    event_device.command_inout("send_event", wait=True)
+    # Wait for tango event
+    event_device.read_attribute("state", wait=True)
+    event_device.read_attribute("state", wait=True)
+    event_device.read_attribute("state", wait=True)
+    # Test the event values
+    assert results == [0., 1.]

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/pytango.git



More information about the debian-science-commits mailing list