代码之家  ›  专栏  ›  技术社区  ›  codeape

长期运行过程中的SQLAlchemy会话管理

  •  3
  • codeape  · 技术社区  · 15 年前

    脚本:

    • 基于.NET的应用服务器( Wonderware IAS/System Platform )主机自动化对象,与工厂车间的各种设备通信。
    • cpython托管在此应用程序服务器内(使用 Python for .NET )
    • 自动化对象具有内置的脚本功能(使用基于.NET的自定义语言)。这些脚本调用python函数。

    python函数是系统的一部分,用于跟踪工厂中正在进行的工作。系统的目的是跟踪过程中产生的小部件,确保小部件以正确的顺序通过过程,并检查过程中是否满足某些条件。小部件生产历史和小部件状态存储在关系数据库中,这是sqlachemy发挥作用的地方。

    例如,当小部件通过扫描器时,自动化软件触发以下脚本(用应用程序服务器的自定义脚本语言编写):

    ' wiget_id and scanner_id provided by automation object
    ' ExecFunction() takes care of calling a CPython function
    retval = ExecFunction("WidgetScanned", widget_id, scanner_id);
    ' if the python function raises an Exception, ErrorOccured will be true
    ' in this case, any errors should cause the production line to stop.
    if (retval.ErrorOccured) then
        ProductionLine.Running = False;
        InformationBoard.DisplayText = "ERROR: " + retval.Exception.Message;
        InformationBoard.SoundAlarm = True
    end if;
    

    脚本调用 WidgetScanned python函数:

    # pywip/functions.py
    from pywip.database import session
    from pywip.model import Widget, WidgetHistoryItem
    from pywip import validation, StatusMessage
    from datetime import datetime
    
    def WidgetScanned(widget_id, scanner_id):
        widget = session.query(Widget).get(widget_id)
        validation.validate_widget_passed_scanner(widget, scanner) # raises exception on error
    
        widget.history.append(WidgetHistoryItem(timestamp=datetime.now(), action=u"SCANNED", scanner_id=scanner_id))
        widget.last_scanner = scanner_id
        widget.last_update = datetime.now()
    
        return StatusMessage("OK")
    
    # ... there are a dozen similar functions
    

    我的问题是: 在这个场景中,如何最好地管理SQLAlchemy会话? 应用程序服务器是一个长期运行的过程,通常在重启之间运行几个月。应用服务器是单线程的。

    目前,我是这样做的:

    我将一个修饰器应用于我提供给应用服务器的函数:

    # pywip/iasfunctions.py
    from pywip import functions
    
    def ias_session_handling(func):
        def _ias_session_handling(*args, **kwargs):
            try:
                retval = func(*args, **kwargs)
                session.commit()
                return retval
            except:
                session.rollback()
                raise
        return _ias_session_handling
    
    # ... actually I populate this module with decorated versions of all the functions in pywip.functions dynamically
    WidgetScanned = ias_session_handling(functions.WidgetScanned)
    

    问题: 上面的decorator是否适合在长时间运行的过程中处理会话? 我应该打电话吗? session.remove() ?

    sqlAlchemy会话对象是作用域会话:

    # pywip/database.py
    from sqlalchemy.orm import scoped_session, sessionmaker
    
    session = scoped_session(sessionmaker())
    

    我想把会话管理从基本功能中去掉。有两个原因:

    1. 还有另一个函数系列,序列函数。序列函数调用几个基本函数。一个序列函数应该等于一个数据库事务。
    2. 我需要能够使用来自其他环境的库。a)来自TurboGears Web应用程序。在这种情况下,会话管理由涡轮齿轮机构完成。b)从IPython外壳。在这种情况下,提交/回滚将是显式的。

    (我真的为这个长问题感到抱歉。但我觉得我需要解释一下这种情况。也许不必要?)

    2 回复  |  直到 14 年前
        1
  •  4
  •   Ants Aasma    15 年前

    所描述的装饰器适用于长时间运行的应用程序,但如果在请求之间意外共享对象,则可能会遇到问题。若要使错误更早出现并且不损坏任何内容,最好放弃与session.remove()的会话。

    try:
        try:
            retval = func(*args, **kwargs)
            session.commit()
            return retval
        except:
            session.rollback()
            raise
    finally:
        session.remove()
    

    或者如果你可以使用 with 上下文管理器:

    try:
        with session.registry().transaction:
            return func(*args, **kwargs)
    finally:
        session.remove()
    

    顺便说一下,你可能想用 .with_lockmode('update') 以便验证不会在过时数据上运行。

        2
  •  1
  •   silvrhand    14 年前

    让您的Wonderware管理员为您提供访问Wonderware Historian的权限,您可以非常容易地通过SQL调用来跟踪标签的值,您可以经常进行轮询。

    另一种选择是使用Archestra工具包来监听内部标签更新,并将服务器部署为Galaxy中的一个平台,您可以从中监听。

    推荐文章