[pytango] 464/483: Fix bugs in gevent mode

Sandor Bodo-Merle sbodomerle-guest at moszumanska.debian.org
Thu Sep 28 19:15:13 UTC 2017


This is an automated email from the git hooks/post-receive script.

sbodomerle-guest pushed a commit to annotated tag bliss_8.10
in repository pytango.

commit 27f46fe0edc43efb2cd673e565601edfe34e8450
Author: coutinho <coutinho at esrf.fr>
Date:   Tue Apr 14 14:14:04 2015 +0200

    Fix bugs in gevent mode
    
    - Make sure post_init_callback, init_device and delete_device all work
      in gevent mode in main thread
    - add green_mode parameter to attribute and command decorators
    - remove green_mode decorator
    - rename internal functions
---
 src/boost/python/server.py | 288 ++++++++++++++++++++-------------------------
 1 file changed, 129 insertions(+), 159 deletions(-)

diff --git a/src/boost/python/server.py b/src/boost/python/server.py
index 4564a22..b45a552 100644
--- a/src/boost/python/server.py
+++ b/src/boost/python/server.py
@@ -17,7 +17,7 @@ from __future__ import absolute_import
 
 __all__ = ["DeviceMeta", "Device", "LatestDeviceImpl", "attribute",
            "command", "device_property", "class_property",
-           "run", "server_run", "Server", "green_mode",  "get_gevent_worker"]
+           "run", "server_run", "Server", "get_worker", "get_gevent_worker"]
 
 import os
 import sys
@@ -167,7 +167,7 @@ def set_complex_value(attr, value):
             attr.set_value(value)
 
 
-def check_dev_klass_attr_read_method(tango_device_klass, attribute):
+def __patch_read_method(tango_device_klass, attribute):
     """
     Checks if method given by it's name for the given DeviceImpl
     class has the correct signature. If a read/write method doesn't
@@ -190,44 +190,32 @@ def check_dev_klass_attr_read_method(tango_device_klass, attribute):
 
     read_args = inspect.getargspec(read_method)
 
-    try:
-        green_mode = read_method._pytango_green_mode
-    except AttributeError:
-        green_mode = True
+    green_mode = attribute.read_green_mode
 
     if len(read_args.args) < 2:
-        if green_mode:
+        if green_mode == GreenMode.Synchronous:
             @functools.wraps(read_method)
             def read_attr(self, attr):
-                worker = get_gevent_worker()
-                if worker:
-                    ret = worker.execute(read_method, self)
-                else:
-                    ret = read_method(self)
+                ret = read_method(self)
                 if not attr.get_value_flag() and ret is not None:
                     set_complex_value(attr, ret)
                 return ret
         else:
             @functools.wraps(read_method)
             def read_attr(self, attr):
-                ret = read_method(self)
+                worker = get_worker()
+                ret = worker.execute(read_method, self)
                 if not attr.get_value_flag() and ret is not None:
                     set_complex_value(attr, ret)
                 return ret
     else:
-        if green_mode:
+        if green_mode == GreenMode.Synchronous:
+            read_attr = read_method
+        else:
             @functools.wraps(read_method)
             def read_attr(self, attr):
-                worker = get_gevent_worker()
-                if worker:
-                    ret = worker.execute(read_method, self, attr)
-                else:
-                    ret = read_method(self, attr)
-                return ret
-        else:
-            read_attr = read_method
+                return get_worker().execute(read_method, self, attr)
 
-    read_attr._pytango_green_mode = green_mode
 
     method_name = "__read_{0}_wrapper__".format(attribute.attr_name)
     attribute.read_method_name = method_name
@@ -235,7 +223,7 @@ def check_dev_klass_attr_read_method(tango_device_klass, attribute):
     setattr(tango_device_klass, method_name, read_attr)
 
 
-def check_dev_klass_attr_write_method(tango_device_klass, attribute):
+def __patch_write_method(tango_device_klass, attribute):
     """
     Checks if method given by it's name for the given DeviceImpl
     class has the correct signature. If a read/write method doesn't
@@ -256,33 +244,23 @@ def check_dev_klass_attr_write_method(tango_device_klass, attribute):
         method_name = attribute.write_method_name
         write_method = getattr(tango_device_klass, method_name)
 
-    try:
-        green_mode = write_method._pytango_gevent_mode
-    except AttributeError:
-        green_mode = True
+    green_mode = attribute.write_green_mode
 
-    if green_mode:
+    if green_mode == GreenMode.Synchronous:
         @functools.wraps(write_method)
         def write_attr(self, attr):
             value = attr.get_write_value()
-            worker = get_gevent_worker()
-            if worker:
-                ret = worker.execute(write_method, self, value)
-            else:
-                ret = write_method(self, value)
-            return ret
+            return write_method(self, value)
     else:
         @functools.wraps(write_method)
         def write_attr(self, attr):
             value = attr.get_write_value()
-            return write_method(self, value)
-
-    write_attr._pytango_green_mode = green_mode
+            return get_worker().execute(write_method, self, value)
 
     setattr(tango_device_klass, method_name, write_attr)
 
 
-def check_dev_klass_attr_methods(tango_device_klass, attribute):
+def __patch_attr_methods(tango_device_klass, attribute):
     """
     Checks if the read and write methods have the correct signature.
     If a read/write method doesn't have a parameter (the traditional
@@ -296,12 +274,31 @@ def check_dev_klass_attr_methods(tango_device_klass, attribute):
     """
     if attribute.attr_write in (AttrWriteType.READ,
                                 AttrWriteType.READ_WRITE):
-        check_dev_klass_attr_read_method(tango_device_klass,
-                                         attribute)
+        __patch_read_method(tango_device_klass, attribute)
     if attribute.attr_write in (AttrWriteType.WRITE,
                                 AttrWriteType.READ_WRITE):
-        check_dev_klass_attr_write_method(tango_device_klass,
-                                          attribute)
+        __patch_write_method(tango_device_klass, attribute)
+
+
+def __patch_init_delete_device(klass):
+    # TODO allow to force non green mode
+    green_mode = True
+
+    if green_mode == GreenMode.Synchronous:
+        pass
+    else:
+        init_device_orig = klass.init_device
+        @functools.wraps(init_device_orig)
+        def init_device(self):
+            return get_worker().execute(init_device_orig, self)
+        setattr(klass, "init_device", init_device)
+
+        delete_device_orig = klass.delete_device
+        @functools.wraps(delete_device_orig)
+        def delete_device(self):
+            return get_worker().execute(delete_device_orig, self)
+        setattr(klass, "delete_device", delete_device)
+
 
 
 class _DeviceClass(DeviceClass):
@@ -310,15 +307,6 @@ class _DeviceClass(DeviceClass):
         DeviceClass.__init__(self, name)
         self.set_type(name)
 
-    def _new_device(self, klass, dev_class, dev_name):
-        worker = get_gevent_worker()
-        if worker:
-            return worker.execute(DeviceClass._new_device, self,
-                                  klass, dev_class, dev_name)
-        else:
-            return DeviceClass._new_device(self, klass, dev_class,
-                                           dev_name)
-
     def dyn_attr(self, dev_list):
         """Invoked to create dynamic attributes for the given devices.
         Default implementation calls
@@ -341,7 +329,7 @@ class _DeviceClass(DeviceClass):
                                      traceback.format_exc())
 
 
-def create_tango_deviceclass_klass(tango_device_klass, attrs=None):
+def __create_tango_deviceclass_klass(tango_device_klass, attrs=None):
     klass_name = tango_device_klass.__name__
     if not issubclass(tango_device_klass, (Device)):
         msg = "{0} device must inherit from " \
@@ -363,7 +351,7 @@ def create_tango_deviceclass_klass(tango_device_klass, attrs=None):
             else:
                 attr_name = attr_obj.attr_name
             attr_list[attr_name] = attr_obj
-            check_dev_klass_attr_methods(tango_device_klass, attr_obj)
+            __patch_attr_methods(tango_device_klass, attr_obj)
         elif isinstance(attr_obj, device_property):
             attr_obj.name = attr_name
             device_property_list[attr_name] = [attr_obj.dtype,
@@ -379,6 +367,8 @@ def create_tango_deviceclass_klass(tango_device_klass, attrs=None):
                 cmd_name, cmd_info = attr_obj.__tango_command__
                 cmd_list[cmd_name] = cmd_info
 
+    __patch_init_delete_device(tango_device_klass)
+
     devclass_name = klass_name + "Class"
 
     devclass_attrs = dict(class_property_list=class_property_list,
@@ -387,10 +377,10 @@ def create_tango_deviceclass_klass(tango_device_klass, attrs=None):
     return type(devclass_name, (_DeviceClass,), devclass_attrs)
 
 
-def init_tango_device_klass(tango_device_klass, attrs=None,
-                            tango_class_name=None):
+def __init_tango_device_klass(tango_device_klass, attrs=None,
+                              tango_class_name=None):
     klass_name = tango_device_klass.__name__
-    tango_deviceclass_klass = create_tango_deviceclass_klass(
+    tango_deviceclass_klass = __create_tango_deviceclass_klass(
         tango_device_klass, attrs=attrs)
     if tango_class_name is None:
         if hasattr(tango_device_klass, "TangoClassName"):
@@ -403,15 +393,6 @@ def init_tango_device_klass(tango_device_klass, attrs=None,
     return tango_device_klass
 
 
-def create_tango_device_klass(name, bases, attrs):
-    klass_name = name
-
-    LatestDeviceImplMeta = type(LatestDeviceImpl)
-    klass = LatestDeviceImplMeta(klass_name, bases, attrs)
-    init_tango_device_klass(klass, attrs)
-    return klass
-
-
 def DeviceMeta(name, bases, attrs):
     """
     The :py:data:`metaclass` callable for :class:`Device`.Every
@@ -433,7 +414,10 @@ def DeviceMeta(name, bases, attrs):
         class PowerSupply(Device, metaclass=DeviceMeta):
             pass
     """
-    return create_tango_device_klass(name, bases, attrs)
+    LatestDeviceImplMeta = type(LatestDeviceImpl)
+    klass = LatestDeviceImplMeta(name, bases, attrs)
+    __init_tango_device_klass(klass, attrs)
+    return klass
 
 
 class Device(LatestDeviceImpl):
@@ -531,6 +515,9 @@ class attribute(AttrData):
     archive_abs_change     :obj:`str`                       None
     archive_rel_change     :obj:`str`                       None
     archive_period         :obj:`str`                       None
+    green_mode             :obj:`~PyTango.GreenMode`        None                                    green mode for read and write. None means use server green mode.
+    read_green_mode        :obj:`~PyTango.GreenMode`        None                                    green mode for read. None means use server green mode.
+    write_green_mode       :obj:`~PyTango.GreenMode`        None                                    green mode for write. None means use server green mode.
     ===================== ================================ ======================================= =======================================================================================
 
     .. note::
@@ -583,6 +570,9 @@ class attribute(AttrData):
         self._kwargs = dict(kwargs)
         name = kwargs.pop("name", None)
         class_name = kwargs.pop("class_name", None)
+        green_mode = kwargs.pop("green_mode", True)
+        self.read_green_mode = kwargs.pop("read_green_mode", green_mode)
+        self.write_green_mode = kwargs.pop("write_green_mode", green_mode)
 
         if fget:
             if inspect.isroutine(fget):
@@ -643,7 +633,7 @@ class attribute(AttrData):
 
 
 def command(f=None, dtype_in=None, dformat_in=None, doc_in="",
-            dtype_out=None, dformat_out=None, doc_out="",):
+            dtype_out=None, dformat_out=None, doc_out="", green_mode=None):
     """
     Declares a new tango command in a :class:`Device`.
     To be used like a decorator in the methods you want to declare as
@@ -690,44 +680,36 @@ def command(f=None, dtype_in=None, dformat_in=None, doc_in="",
     :type dformat_out: AttrDataFormat
     :param doc_out: return value documentation
     :type doc_out: str
+    :param green_mode:
+        set green mode on this specific command. Default value is None meaning
+        use the server green mode. Set it to GreenMode.Synchronous to force
+        a non green command in a green server.
     """
     if f is None:
         return functools.partial(command,
             dtype_in=dtype_in, dformat_in=dformat_in, doc_in=doc_in,
-            dtype_out=dtype_out, dformat_out=dformat_out,
-            doc_out=doc_out)
+            dtype_out=dtype_out, dformat_out=dformat_out, doc_out=doc_out,
+            green_mode=green_mode)
     name = f.__name__
 
     dtype_in, dformat_in = _get_tango_type_format(dtype_in, dformat_in)
-    dtype_out, dformat_out = _get_tango_type_format(dtype_out,
-                                                    dformat_out)
+    dtype_out, dformat_out = _get_tango_type_format(dtype_out, dformat_out)
 
     din = [from_typeformat_to_type(dtype_in, dformat_in), doc_in]
     dout = [from_typeformat_to_type(dtype_out, dformat_out), doc_out]
 
-    try:
-        green_mode = f._pytango_green_mode
-    except AttributeError:
-        green_mode = True
-
-    if green_mode:
+    if green_mode == GreenMode.Synchronous:
+        cmd = f
+    else:
         @functools.wraps(f)
         def cmd(self, *args, **kwargs):
-            worker = get_gevent_worker()
-            if worker:
-                ret = worker.execute(f, self, *args, **kwargs)
-            else:
-                ret = f(self, *args, **kwargs)
-            return ret
-    else:
-        print("found non green command", f)
-        cmd = f
+            return get_worker().execute(f, self, *args, **kwargs)
+
     cmd.__tango_command__ = name, [din, dout]
-    cmd._pytango_green_mode = green_mode
     return cmd
 
 
-class _property(object):
+class _BaseProperty(object):
 
     def __init__(self, dtype, doc='', default_value=None):
         self.name = None
@@ -747,7 +729,7 @@ class _property(object):
         del obj._tango_properties[self.name]
 
 
-class device_property(_property):
+class device_property(_BaseProperty):
     """
     Declares a new tango device property in a :class:`Device`. To be
     used like the python native :obj:`property` function. For example,
@@ -769,7 +751,7 @@ class device_property(_property):
     pass
 
 
-class class_property(_property):
+class class_property(_BaseProperty):
     """
     Declares a new tango class property in a :class:`Device`. To be
     used like the python native :obj:`property` function. For example,
@@ -883,39 +865,39 @@ def __server_run(classes, args=None, msg_stream=sys.stdout, util=None,
     if util is None:
         util = PyTango.Util(args)
 
-    if gevent_mode and event_loop:
-        gevent_worker = _create_gevent_worker()
-        event_loop = functools.partial(gevent_worker.execute, event_loop)
+    if gevent_mode:
+        worker = _create_gevent_worker()
+        set_worker(worker)
+
+    worker = get_worker()
 
     if event_loop is not None:
+        if gevent_mode:
+            event_loop = functools.partial(worker.execute, event_loop)
         util.server_set_event_loop(event_loop)
 
     log = logging.getLogger("PyTango")
 
-    def tango_loop(worker=None):
-        log.debug("Tango loop started")
+    def tango_loop():
+        log.debug("server loop started")
         _add_classes(util, classes)
-        log.debug("Tango init")
         util.server_init()
-        if worker:
-            worker.execute(post_init_callback)
+        worker.execute(post_init_callback)
         write("Ready to accept request\n")
         util.server_run()
-        if worker:
-            worker.stop()
-        log.debug("Tango loop exit")
+        worker.stop()
+        log.debug("server loop exit")
 
     if gevent_mode:
-        gevent_worker = _create_gevent_worker()
-        start_new_thread = gevent_worker._threading.start_new_thread
-        tango_thread_id = start_new_thread(tango_loop, (gevent_worker,))
-        gevent_worker.run()
-        log.debug("Gevent_worker finished")
+        tango_thread_id = worker.run_in_thread(tango_loop)
+        worker.run()
+        log.debug("gevent worker finished")
     else:
         tango_loop()
 
     return util
 
+
 def run(classes, args=None, msg_stream=sys.stdout,
         verbose=False, util=None, event_loop=None,
         post_init_callback=None, green_mode=None):
@@ -1091,25 +1073,32 @@ def server_run(classes, args=None, msg_stream=sys.stdout,
                green_mode=green_mode)
 
 
-def green_mode(f=None, enable=True):
-    """
-    Decorator to force the method to run in the tango thread instead of the
-    green thread.
-    """
-    if f is None:
-        return functools.partial(green_mode, enable=enable)
+class BaseWorker:
+    def __init__(self, max_queue_size=0):
+        pass
+    def execute(self, func, *args, **kwargs):
+        return func(*args, **kwargs)
+    def stop(self):
+        pass
 
-    f._pytango_green_mode = enable
-    print("Set %s green mode to %s" % (f, enable))
-    return f
 
+__WORKER = BaseWorker()
+def get_worker():
+    global __WORKER
+    return __WORKER
 
-__GEVENT_WORKER = None
 
+def set_worker(worker):
+    global __WORKER
+    __WORKER = worker
+
+
+__GEVENT_WORKER = None
 def get_gevent_worker():
     global __GEVENT_WORKER
     return __GEVENT_WORKER
 
+
 def _create_gevent_worker():
     global __GEVENT_WORKER
     if __GEVENT_WORKER:
@@ -1122,10 +1111,9 @@ def _create_gevent_worker():
 
     import gevent
     import gevent.event
+    import gevent._threading
 
-    class GeventWorker:
-
-        from gevent import _threading
+    class GeventWorker(BaseWorker):
 
         class Task:
 
@@ -1159,12 +1147,16 @@ def _create_gevent_worker():
             task = self.__tasks.get()
             return task.run()
 
+        def run_in_thread(self, func, *args, **kwargs):
+            thread_id = gevent._threading.start_new_thread(func, args, kwargs)
+            return thread_id
+
         def run(self, timeout=None):
             return gevent.wait(objects=(self.__stop_event,),
                                timeout=timeout)
 
         def execute(self, func, *args, **kwargs):
-            event = self._threading.Event()
+            event = gevent._threading.Event()
             task = self.Task(event, func, *args, **kwargs)
             self.__tasks.put(task)
             self.__watcher.send()
@@ -1292,20 +1284,14 @@ def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
                 def _command(dev, func_name=None):
                     obj = dev._object
                     f = getattr(obj, func_name)
-                    if server.gevent_worker:
-                        result = server.gevent_worker.execute(f)
-                    else:
-                        result = f()
+                    result = server.worker.execute(f)
                     return server.dumps(result)
             else:
                 def _command(dev, param, func_name=None):
                     obj = dev._object
                     args, kwargs = loads(*param)
                     f = getattr(obj, func_name)
-                    if server.gevent_worker:
-                        result = server.gevent_worker.execute(f, *args, **kwargs)
-                    else:
-                        result = f(*args, **kwargs)
+                    result = server.worker.execute(f, *args, **kwargs)
                     return server.dumps(result)
             cmd = functools.partial(_command, func_name=name)
             cmd.__name__ = name
@@ -1333,34 +1319,22 @@ def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
                 fmt = AttrDataFormat.SCALAR
                 def read(dev, attr):
                     name = attr.get_name()
-                    if server.gevent_worker:
-                        value = server.gevent_worker.execute(getattr, dev._object, name)
-                    else:
-                        value = getattr(dev._object, name)
+                    value = server.worker.execute(getattr, dev._object, name)
                     attr.set_value(*server.dumps(value))
                 def write(dev, attr):
                     name = attr.get_name()
                     value = attr.get_write_value()
                     value = loads(*value)
-                    if server.gevent_worker:
-                        server.gevent_worker.execute(setattr, dev._object, name, value)
-                    else:
-                        setattr(dev._object, name, value)
+                    server.worker.execute(setattr, dev._object, name, value)
             else:
                 def read(dev, attr):
                     name = attr.get_name()
-                    if server.gevent_worker:
-                        value = server.gevent_worker.execute(getattr, dev._object, name)
-                    else:
-                        value = getattr(dev._object, name)
+                    value = server.worker.execute(getattr, dev._object, name)
                     attr.set_value(value)
                 def write(dev, attr):
                     name = attr.get_name()
                     value = attr.get_write_value()
-                    if server.gevent_worker:
-                        server.gevent_worker.execute(setattr, dev._object, name, value)
-                    else:
-                        setattr(dev._object, name, value)
+                    server.worker.execute(setattr, dev._object, name, value)
             read.__name__ = "_read_" + name
             setattr(DeviceDispatcher, read.__name__, read)
 
@@ -1441,7 +1415,8 @@ class Server:
         if self.gevent_mode:
             self.__worker = _create_gevent_worker()
         else:
-            self.__worker = None
+            self.__worker = get_worker()
+        set_worker(self.__worker)
         self.log = logging.getLogger("PyTango.Server")
         self.__phase = Server.Phase0
 
@@ -1455,10 +1430,7 @@ class Server:
     def __exec_cb(self, cb):
         if not cb:
             return
-        if self.gevent_mode:
-            self.__worker.execute(cb)
-        else:
-            cb()
+        self.worker.execute(cb)
 
     def __find_tango_class(self, key):
         pass
@@ -1558,7 +1530,7 @@ class Server:
 
         if gevent_mode:
             if event_loop:
-                event_loop = functools.partial(self.__worker.execute,
+                event_loop = functools.partial(self.worker.execute,
                                                event_loop)
         if event_loop:
             u_instance.server_set_event_loop(event_loop)
@@ -1566,28 +1538,26 @@ class Server:
         _add_classes(util, self.__tango_classes)
 
         if gevent_mode:
-            start_new_thread = self.__worker._threading.start_new_thread
-            tango_thread_id = start_new_thread(self.__tango_loop, ())
+            tango_thread_id = self.worker.run_in_thread(self.__tango_loop)
 
     def __run(self, timeout=None):
         if self.gevent_mode:
-            return self.__worker.run(timeout=timeout)
+            return self.worker.run(timeout=timeout)
         else:
             self.__tango_loop()
 
     def __tango_loop(self):
-        self.log.debug("tango_loop")
+        self.log.debug("server loop started")
         self.__running = True
         u_instance = self.tango_util.instance()
         u_instance.server_init()
         self._phase = Server.Phase2
         self.log.info("Ready to accept request")
         u_instance.server_run()
-        if self.gevent_mode:
-            self.__worker.stop()
+        self.worker.stop()
         if self.__auto_clean:
             self.__clean_up_process()
-        self.log.debug("Tango loop exit")
+        self.log.debug("server loop exit")
 
     @property
     def _phase(self):

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/pytango.git



More information about the debian-science-commits mailing list