[pytango] 18/25: Add asyncio support for servers
Sandor Bodo-Merle
sbodomerle-guest at moszumanska.debian.org
Thu Sep 28 19:17:17 UTC 2017
This is an automated email from the git hooks/post-receive script.
sbodomerle-guest pushed a commit to tag v8.1.9
in repository pytango.
commit 4eafbdf961c0da3bd5050c92887c1084d1cd6b7d
Author: Vincent Michel <vincent.michel at maxlab.lu.se>
Date: Tue Aug 9 11:34:54 2016 +0200
Add asyncio support for servers
---
src/boost/python/asyncio_tools.py | 100 ++++++++++++++++++++++++++++++++
src/boost/python/server.py | 118 +++++++++++++++++++++++++++++---------
2 files changed, 191 insertions(+), 27 deletions(-)
diff --git a/src/boost/python/asyncio_tools.py b/src/boost/python/asyncio_tools.py
new file mode 100644
index 0000000..55a0ca0
--- /dev/null
+++ b/src/boost/python/asyncio_tools.py
@@ -0,0 +1,100 @@
+"""Backport some asyncio features."""
+from __future__ import absolute_import
+
+import concurrent.futures
+try:
+ import asyncio
+except ImportError:
+ import trollius as asyncio
+
+__all__ = ["run_coroutine_threadsafe"]
+
+
+def _set_concurrent_future_state(concurrent, source):
+ """Copy state from a future to a concurrent.futures.Future."""
+ assert source.done()
+ if source.cancelled():
+ concurrent.cancel()
+ if not concurrent.set_running_or_notify_cancel():
+ return
+ exception = source.exception()
+ if exception is not None:
+ concurrent.set_exception(exception)
+ else:
+ result = source.result()
+ concurrent.set_result(result)
+
+
+def _copy_future_state(source, dest):
+ """Internal helper to copy state from another Future.
+ The other Future may be a concurrent.futures.Future.
+ """
+ assert source.done()
+ if dest.cancelled():
+ return
+ assert not dest.done()
+ if source.cancelled():
+ dest.cancel()
+ else:
+ exception = source.exception()
+ if exception is not None:
+ dest.set_exception(exception)
+ else:
+ result = source.result()
+ dest.set_result(result)
+
+
+def _chain_future(source, dest):
+ """Chain two futures so that when one completes, so does the other.
+ The result (or exception) of source will be copied to destination.
+ If destination is cancelled, source gets cancelled too.
+ Compatible with both asyncio.Future and concurrent.futures.Future.
+ """
+ if not isinstance(source, (asyncio.Future, concurrent.futures.Future)):
+ raise TypeError('A future is required for source argument')
+ if not isinstance(dest, (asyncio.Future, concurrent.futures.Future)):
+ raise TypeError('A future is required for destination argument')
+ source_loop = source._loop if isinstance(source, asyncio.Future) else None
+ dest_loop = dest._loop if isinstance(dest, asyncio.Future) else None
+
+ def _set_state(future, other):
+ if isinstance(future, asyncio.Future):
+ _copy_future_state(other, future)
+ else:
+ _set_concurrent_future_state(future, other)
+
+ def _call_check_cancel(destination):
+ if destination.cancelled():
+ if source_loop is None or source_loop is dest_loop:
+ source.cancel()
+ else:
+ source_loop.call_soon_threadsafe(source.cancel)
+
+ def _call_set_state(source):
+ if dest_loop is None or dest_loop is source_loop:
+ _set_state(dest, source)
+ else:
+ dest_loop.call_soon_threadsafe(_set_state, dest, source)
+
+ dest.add_done_callback(_call_check_cancel)
+ source.add_done_callback(_call_set_state)
+
+
+def run_coroutine_threadsafe(coro, loop):
+ """Submit a coroutine object to a given event loop.
+ Return a concurrent.futures.Future to access the result.
+ """
+ if not asyncio.iscoroutine(coro):
+ raise TypeError('A coroutine object is required')
+ future = concurrent.futures.Future()
+
+ def callback():
+ try:
+ _chain_future(asyncio.async(coro, loop=loop), future)
+ except Exception as exc:
+ if future.set_running_or_notify_cancel():
+ future.set_exception(exc)
+ raise
+
+ loop.call_soon_threadsafe(callback)
+ return future
diff --git a/src/boost/python/server.py b/src/boost/python/server.py
index d7a8cc3..1efb7b7 100644
--- a/src/boost/python/server.py
+++ b/src/boost/python/server.py
@@ -17,7 +17,7 @@ from __future__ import absolute_import
__all__ = ["DeviceMeta", "Device", "LatestDeviceImpl", "attribute",
"command", "device_property", "class_property",
- "run", "server_run", "Server", "get_worker", "get_gevent_worker"]
+ "run", "server_run", "Server", "get_worker", "get_async_worker"]
import os
import sys
@@ -553,6 +553,8 @@ class Device(LatestDeviceImpl):
if args is None:
args = sys.argv[1:]
args = [cls.__name__] + list(args)
+ green_mode = getattr(cls, 'green_mode', None)
+ kwargs.setdefault("green_mode", green_mode)
return run((cls,), args, **kwargs)
@@ -976,7 +978,7 @@ def __server_run(classes, args=None, msg_stream=sys.stdout, util=None,
if green_mode is None:
from PyTango import get_green_mode
green_mode = get_green_mode()
- gevent_mode = green_mode == GreenMode.Gevent
+ async_mode = green_mode in (GreenMode.Gevent, GreenMode.Asyncio)
import PyTango
if msg_stream is None:
@@ -992,15 +994,15 @@ def __server_run(classes, args=None, msg_stream=sys.stdout, util=None,
if util is None:
util = PyTango.Util(args)
- if gevent_mode:
+ if async_mode:
util.set_serial_model(PyTango.SerialModel.NO_SYNC)
- worker = _create_gevent_worker()
+ worker = _create_async_worker(green_mode)
set_worker(worker)
worker = get_worker()
if event_loop is not None:
- if gevent_mode:
+ if async_mode:
event_loop = functools.partial(worker.execute, event_loop)
util.server_set_event_loop(event_loop)
@@ -1016,10 +1018,10 @@ def __server_run(classes, args=None, msg_stream=sys.stdout, util=None,
worker.stop()
log.debug("server loop exit")
- if gevent_mode:
+ if async_mode:
tango_thread_id = worker.run_in_thread(tango_loop)
worker.run()
- log.debug("gevent worker finished")
+ log.debug("async worker finished")
else:
tango_loop()
@@ -1221,17 +1223,24 @@ def set_worker(worker):
__WORKER = worker
-__GEVENT_WORKER = None
-def get_gevent_worker():
- global __GEVENT_WORKER
- return __GEVENT_WORKER
+__ASYNC_WORKER = None
+def get_async_worker():
+ global __ASYNC_WORKER
+ return __ASYNC_WORKER
-def _create_gevent_worker():
- global __GEVENT_WORKER
- if __GEVENT_WORKER:
- return __GEVENT_WORKER
+def _create_async_worker(green_mode):
+ global __ASYNC_WORKER
+ if __ASYNC_WORKER:
+ return __ASYNC_WORKER
+ if green_mode == GreenMode.Gevent:
+ _ASYNC_WORKER = _create_gevent_worker()
+ if green_mode == GreenMode.Asyncio:
+ _ASYNC_WORKER = _create_asyncio_worker()
+ return _ASYNC_WORKER
+
+def _create_gevent_worker():
try:
from queue import Queue
except:
@@ -1301,8 +1310,63 @@ def _create_gevent_worker():
self.__tasks.put(task)
self.__watcher.send()
- __GEVENT_WORKER = GeventWorker()
- return __GEVENT_WORKER
+ return GeventWorker()
+
+
+def _create_asyncio_worker():
+ import concurrent.futures
+
+ try:
+ import asyncio
+ except ImportError:
+ import trollius as asyncio
+
+ try:
+ from asyncio import run_coroutine_threadsafe
+ except ImportError:
+ from .asyncio_tools import run_coroutine_threadsafe
+
+ class LoopExecutor(concurrent.futures.Executor):
+ """An Executor subclass that uses an event loop
+ to execute calls asynchronously."""
+
+ def __init__(self, loop=None):
+ """Initialize the executor with a given loop."""
+ self.loop = loop or asyncio.get_event_loop()
+
+ def submit(self, fn, *args, **kwargs):
+ """Schedule the callable fn, to be executed as fn(*args **kwargs).
+ Return a Future representing the execution of the callable."""
+ corofn = asyncio.coroutine(lambda: fn(*args, **kwargs))
+ return run_coroutine_threadsafe(corofn(), loop)
+
+ def run_in_thread(self, func, *args, **kwargs):
+ """Schedule a blocking callback."""
+ callback = lambda: func(*args, **kwargs)
+ coro = self.loop.run_in_executor(None, callback)
+ # That is not actually necessary since coro is actually
+ # a future. But it is an implementation detail and it
+ # might be changed later on.
+ asyncio.async(coro)
+
+ def run(self, timeout=None):
+ """Run the asyncio event loop."""
+ self.loop.run_forever()
+
+ def stop(self):
+ """Run the asyncio event loop."""
+ self.loop.stop()
+
+ def execute(self, fn, *args, **kwargs):
+ """Execute the callable fn as fn(*args **kwargs)."""
+ return self.submit(fn, *args, **kwargs).result()
+
+ try:
+ loop = asyncio.get_event_loop()
+ except RuntimeError:
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ return LoopExecutor(loop=loop)
_CLEAN_UP_TEMPLATE = """
@@ -1542,8 +1606,8 @@ class Server:
self.__protocol = protocol
self.__tango_classes = _to_classes(tango_classes or [])
self.__tango_devices = []
- if self.gevent_mode:
- self.__worker = _create_gevent_worker()
+ if self.async_mode:
+ self.__worker = _create_async_worker(self.green_mode)
else:
self.__worker = get_worker()
set_worker(self.__worker)
@@ -1652,13 +1716,13 @@ class Server:
def __initialize(self):
self.log.debug("initialize")
- gevent_mode = self.gevent_mode
+ async_mode = self.async_mode
event_loop = self.__event_loop_callback
util = self.tango_util
u_instance = util.instance()
- if gevent_mode:
+ if async_mode:
if event_loop:
event_loop = functools.partial(self.worker.execute,
event_loop)
@@ -1667,11 +1731,11 @@ class Server:
_add_classes(util, self.__tango_classes)
- if gevent_mode:
+ if async_mode:
tango_thread_id = self.worker.run_in_thread(self.__tango_loop)
def __run(self, timeout=None):
- if self.gevent_mode:
+ if self.async_mode:
return self.worker.run(timeout=timeout)
else:
self.__tango_loop()
@@ -1737,8 +1801,8 @@ class Server:
self.__green_mode = gm
@property
- def gevent_mode(self):
- return self.green_mode == GreenMode.Gevent
+ def async_mode(self):
+ return self.green_mode in (GreenMode.Gevent, GreenMode.Asyncio)
@property
def worker(self):
@@ -1840,12 +1904,12 @@ class Server:
def run(self, timeout=None):
self.log.debug("run")
- gevent_mode = self.gevent_mode
+ async_mode = self.async_mode
running = self.__running
if not running:
self.__prepare()
self.__initialize()
else:
- if not gevent_mode:
+ if not async_mode:
raise RuntimeError("Server is already running")
self.__run(timeout=timeout)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/pytango.git
More information about the debian-science-commits
mailing list