[pytango] 18/25: Add asyncio support for servers

Sandor Bodo-Merle sbodomerle-guest at moszumanska.debian.org
Thu Sep 28 19:17:17 UTC 2017


This is an automated email from the git hooks/post-receive script.

sbodomerle-guest pushed a commit to tag v8.1.9
in repository pytango.

commit 4eafbdf961c0da3bd5050c92887c1084d1cd6b7d
Author: Vincent Michel <vincent.michel at maxlab.lu.se>
Date:   Tue Aug 9 11:34:54 2016 +0200

    Add asyncio support for servers
---
 src/boost/python/asyncio_tools.py | 100 ++++++++++++++++++++++++++++++++
 src/boost/python/server.py        | 118 +++++++++++++++++++++++++++++---------
 2 files changed, 191 insertions(+), 27 deletions(-)

diff --git a/src/boost/python/asyncio_tools.py b/src/boost/python/asyncio_tools.py
new file mode 100644
index 0000000..55a0ca0
--- /dev/null
+++ b/src/boost/python/asyncio_tools.py
@@ -0,0 +1,100 @@
+"""Backport some asyncio features."""
+from __future__ import absolute_import
+
+import concurrent.futures
+try:
+    import asyncio
+except ImportError:
+    import trollius as asyncio
+
+__all__ = ["run_coroutine_threadsafe"]
+
+
+def _set_concurrent_future_state(concurrent, source):
+    """Copy state from a future to a concurrent.futures.Future."""
+    assert source.done()
+    if source.cancelled():
+        concurrent.cancel()
+    if not concurrent.set_running_or_notify_cancel():
+        return
+    exception = source.exception()
+    if exception is not None:
+        concurrent.set_exception(exception)
+    else:
+        result = source.result()
+        concurrent.set_result(result)
+
+
+def _copy_future_state(source, dest):
+    """Internal helper to copy state from another Future.
+    The other Future may be a concurrent.futures.Future.
+    """
+    assert source.done()
+    if dest.cancelled():
+        return
+    assert not dest.done()
+    if source.cancelled():
+        dest.cancel()
+    else:
+        exception = source.exception()
+        if exception is not None:
+            dest.set_exception(exception)
+        else:
+            result = source.result()
+            dest.set_result(result)
+
+
+def _chain_future(source, dest):
+    """Chain two futures so that when one completes, so does the other.
+    The result (or exception) of source will be copied to destination.
+    If destination is cancelled, source gets cancelled too.
+    Compatible with both asyncio.Future and concurrent.futures.Future.
+    """
+    if not isinstance(source, (asyncio.Future, concurrent.futures.Future)):
+        raise TypeError('A future is required for source argument')
+    if not isinstance(dest, (asyncio.Future, concurrent.futures.Future)):
+        raise TypeError('A future is required for destination argument')
+    source_loop = source._loop if isinstance(source, asyncio.Future) else None
+    dest_loop = dest._loop if isinstance(dest, asyncio.Future) else None
+
+    def _set_state(future, other):
+        if isinstance(future, asyncio.Future):
+            _copy_future_state(other, future)
+        else:
+            _set_concurrent_future_state(future, other)
+
+    def _call_check_cancel(destination):
+        if destination.cancelled():
+            if source_loop is None or source_loop is dest_loop:
+                source.cancel()
+            else:
+                source_loop.call_soon_threadsafe(source.cancel)
+
+    def _call_set_state(source):
+        if dest_loop is None or dest_loop is source_loop:
+            _set_state(dest, source)
+        else:
+            dest_loop.call_soon_threadsafe(_set_state, dest, source)
+
+    dest.add_done_callback(_call_check_cancel)
+    source.add_done_callback(_call_set_state)
+
+
+def run_coroutine_threadsafe(coro, loop):
+    """Submit a coroutine object to a given event loop.
+    Return a concurrent.futures.Future to access the result.
+    """
+    if not asyncio.iscoroutine(coro):
+        raise TypeError('A coroutine object is required')
+    future = concurrent.futures.Future()
+
+    def callback():
+        try:
+            _chain_future(asyncio.async(coro, loop=loop), future)
+        except Exception as exc:
+            if future.set_running_or_notify_cancel():
+                future.set_exception(exc)
+            raise
+
+    loop.call_soon_threadsafe(callback)
+    return future
diff --git a/src/boost/python/server.py b/src/boost/python/server.py
index d7a8cc3..1efb7b7 100644
--- a/src/boost/python/server.py
+++ b/src/boost/python/server.py
@@ -17,7 +17,7 @@ from __future__ import absolute_import
 
 __all__ = ["DeviceMeta", "Device", "LatestDeviceImpl", "attribute",
            "command", "device_property", "class_property",
-           "run", "server_run", "Server", "get_worker", "get_gevent_worker"]
+           "run", "server_run", "Server", "get_worker", "get_async_worker"]
 
 import os
 import sys
@@ -553,6 +553,8 @@ class Device(LatestDeviceImpl):
         if args is None:
             args = sys.argv[1:]
         args = [cls.__name__] + list(args)
+        green_mode = getattr(cls, 'green_mode', None)
+        kwargs.setdefault("green_mode", green_mode)
         return run((cls,), args, **kwargs)
 
 
@@ -976,7 +978,7 @@ def __server_run(classes, args=None, msg_stream=sys.stdout, util=None,
     if green_mode is None:
         from PyTango import get_green_mode
         green_mode = get_green_mode()
-    gevent_mode = green_mode == GreenMode.Gevent
+    async_mode = green_mode in (GreenMode.Gevent, GreenMode.Asyncio)
 
     import PyTango
     if msg_stream is None:
@@ -992,15 +994,15 @@ def __server_run(classes, args=None, msg_stream=sys.stdout, util=None,
     if util is None:
         util = PyTango.Util(args)
 
-    if gevent_mode:
+    if async_mode:
         util.set_serial_model(PyTango.SerialModel.NO_SYNC)
-        worker = _create_gevent_worker()
+        worker = _create_async_worker(green_mode)
         set_worker(worker)
 
     worker = get_worker()
 
     if event_loop is not None:
-        if gevent_mode:
+        if async_mode:
             event_loop = functools.partial(worker.execute, event_loop)
         util.server_set_event_loop(event_loop)
 
@@ -1016,10 +1018,10 @@ def __server_run(classes, args=None, msg_stream=sys.stdout, util=None,
         worker.stop()
         log.debug("server loop exit")
 
-    if gevent_mode:
+    if async_mode:
         tango_thread_id = worker.run_in_thread(tango_loop)
         worker.run()
-        log.debug("gevent worker finished")
+        log.debug("async worker finished")
     else:
         tango_loop()
 
@@ -1221,17 +1223,24 @@ def set_worker(worker):
     __WORKER = worker
 
 
-__GEVENT_WORKER = None
-def get_gevent_worker():
-    global __GEVENT_WORKER
-    return __GEVENT_WORKER
+__ASYNC_WORKER = None
+def get_async_worker():
+    global __ASYNC_WORKER
+    return __ASYNC_WORKER
 
 
-def _create_gevent_worker():
-    global __GEVENT_WORKER
-    if __GEVENT_WORKER:
-        return __GEVENT_WORKER
+def _create_async_worker(green_mode):
+    global __ASYNC_WORKER
+    if __ASYNC_WORKER:
+        return __ASYNC_WORKER
+    if green_mode == GreenMode.Gevent:
+        _ASYNC_WORKER = _create_gevent_worker()
+    if green_mode == GreenMode.Asyncio:
+        _ASYNC_WORKER = _create_asyncio_worker()
+    return _ASYNC_WORKER
+
 
+def _create_gevent_worker():
     try:
         from queue import Queue
     except:
@@ -1301,8 +1310,63 @@ def _create_gevent_worker():
             self.__tasks.put(task)
             self.__watcher.send()
 
-    __GEVENT_WORKER = GeventWorker()
-    return __GEVENT_WORKER
+    return GeventWorker()
+
+
+def _create_asyncio_worker():
+    import concurrent.futures
+
+    try:
+        import asyncio
+    except ImportError:
+        import trollius as asyncio
+
+    try:
+        from asyncio import run_coroutine_threadsafe
+    except ImportError:
+        from .asyncio_tools import run_coroutine_threadsafe
+
+    class LoopExecutor(concurrent.futures.Executor):
+        """An Executor subclass that uses an event loop
+        to execute calls asynchronously."""
+
+        def __init__(self, loop=None):
+            """Initialize the executor with a given loop."""
+            self.loop = loop or asyncio.get_event_loop()
+
+        def submit(self, fn, *args, **kwargs):
+            """Schedule the callable fn, to be executed as fn(*args **kwargs).
+            Return a Future representing the execution of the callable."""
+            corofn = asyncio.coroutine(lambda: fn(*args, **kwargs))
+            return run_coroutine_threadsafe(corofn(), loop)
+
+        def run_in_thread(self, func, *args, **kwargs):
+            """Schedule a blocking callback."""
+            callback = lambda: func(*args, **kwargs)
+            coro = self.loop.run_in_executor(None, callback)
+            # That is not actually necessary since coro is actually
+            # a future. But it is an implementation detail and it
+            # might be changed later on.
+            asyncio.async(coro)
+
+        def run(self, timeout=None):
+            """Run the asyncio event loop."""
+            self.loop.run_forever()
+
+        def stop(self):
+            """Run the asyncio event loop."""
+            self.loop.stop()
+
+        def execute(self, fn, *args, **kwargs):
+            """Execute the callable fn as fn(*args **kwargs)."""
+            return self.submit(fn, *args, **kwargs).result()
+
+    try:
+        loop = asyncio.get_event_loop()
+    except RuntimeError:
+        loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop)
+    return LoopExecutor(loop=loop)
 
 
 _CLEAN_UP_TEMPLATE = """
@@ -1542,8 +1606,8 @@ class Server:
         self.__protocol = protocol
         self.__tango_classes = _to_classes(tango_classes or [])
         self.__tango_devices = []
-        if self.gevent_mode:
-            self.__worker = _create_gevent_worker()
+        if self.async_mode:
+            self.__worker = _create_async_worker(self.green_mode)
         else:
             self.__worker = get_worker()
         set_worker(self.__worker)
@@ -1652,13 +1716,13 @@ class Server:
 
     def __initialize(self):
         self.log.debug("initialize")
-        gevent_mode = self.gevent_mode
+        async_mode = self.async_mode
         event_loop = self.__event_loop_callback
 
         util = self.tango_util
         u_instance = util.instance()
 
-        if gevent_mode:
+        if async_mode:
             if event_loop:
                 event_loop = functools.partial(self.worker.execute,
                                                event_loop)
@@ -1667,11 +1731,11 @@ class Server:
 
         _add_classes(util, self.__tango_classes)
 
-        if gevent_mode:
+        if async_mode:
             tango_thread_id = self.worker.run_in_thread(self.__tango_loop)
 
     def __run(self, timeout=None):
-        if self.gevent_mode:
+        if self.async_mode:
             return self.worker.run(timeout=timeout)
         else:
             self.__tango_loop()
@@ -1737,8 +1801,8 @@ class Server:
         self.__green_mode = gm
 
     @property
-    def gevent_mode(self):
-        return self.green_mode == GreenMode.Gevent
+    def async_mode(self):
+        return self.green_mode in (GreenMode.Gevent, GreenMode.Asyncio)
 
     @property
     def worker(self):
@@ -1840,12 +1904,12 @@ class Server:
 
     def run(self, timeout=None):
         self.log.debug("run")
-        gevent_mode = self.gevent_mode
+        async_mode = self.async_mode
         running = self.__running
         if not running:
             self.__prepare()
             self.__initialize()
         else:
-            if not gevent_mode:
+            if not async_mode:
                 raise RuntimeError("Server is already running")
         self.__run(timeout=timeout)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/pytango.git



More information about the debian-science-commits mailing list