代码之家  ›  专栏  ›  技术社区  ›  Peter Smit

如何将python dict与多处理同步

  •  24
  • Peter Smit  · 技术社区  · 14 年前

    我使用Python2.6和多处理模块实现多线程。现在我想要一个同步的dict(其中我真正需要的唯一原子操作是值上的+运算符)。

    我应该用多处理.sharedTypes.synchronized()调用包装dict吗?还是另一条路?

    4 回复  |  直到 6 年前
        1
  •  54
  •   Maxim AD7six    6 年前

    简介

    似乎有很多扶手椅的建议,没有工作的例子。这里列出的所有答案都不建议使用多处理,这有点令人失望和不安。作为python爱好者,我们应该支持我们的内置库,虽然并行处理和同步从来不是一件小事,但我相信只要设计得当,它就可以变得微不足道。这在现代多核体系结构中变得非常重要,而且压力也不够大!也就是说,我对多处理库还远远不满意,因为它还处于初级阶段,有很多陷阱、错误,而且正面向函数式编程(我讨厌函数式编程)。现在我还是喜欢 Pyro 由于多处理在服务器运行时无法共享新创建的对象这一严重限制,模块(这比它的时间要早得多)。管理器对象的“register”类方法只会在管理器(或其服务器)启动之前实际注册对象。足够的喋喋不休,更多的代码:

    Serv.Py

    from multiprocessing.managers import SyncManager
    
    
    class MyManager(SyncManager):
        pass
    
    
    syncdict = {}
    def get_dict():
        return syncdict
    
    if __name__ == "__main__":
        MyManager.register("syncdict", get_dict)
        manager = MyManager(("127.0.0.1", 5000), authkey="password")
        manager.start()
        raw_input("Press any key to kill server".center(50, "-"))
        manager.shutdown()
    

    在上面的代码示例中,server.py使用了多处理的syncmanager,它可以提供同步的共享对象。这段代码在解释器中运行是不起作用的,因为多处理库对于如何找到每个注册对象的“可调用”非常敏感。运行server.py将启动一个自定义的syncmanager,该管理器共享syncdict字典以供多个进程使用,并且可以连接到同一台计算机上的客户端,或者如果在环回以外的IP地址上运行,则连接到其他计算机上的客户端。在这种情况下,服务器在端口5000上的环回(127.0.0.1)上运行。在操作syncdict时,使用authkey参数使用安全连接。当按下任何键时,管理器将关闭。

    客户机

    from multiprocessing.managers import SyncManager
    import sys, time
    
    class MyManager(SyncManager):
        pass
    
    MyManager.register("syncdict")
    
    if __name__ == "__main__":
        manager = MyManager(("127.0.0.1", 5000), authkey="password")
        manager.connect()
        syncdict = manager.syncdict()
    
        print "dict = %s" % (dir(syncdict))
        key = raw_input("Enter key to update: ")
        inc = float(raw_input("Enter increment: "))
        sleep = float(raw_input("Enter sleep time (sec): "))
    
        try:
             #if the key doesn't exist create it
             if not syncdict.has_key(key):
                 syncdict.update([(key, 0)])
             #increment key value every sleep seconds
             #then print syncdict
             while True:
                  syncdict.update([(key, syncdict.get(key) + inc)])
                  time.sleep(sleep)
                  print "%s" % (syncdict)
        except KeyboardInterrupt:
             print "Killed client"
    

    客户端还必须创建一个自定义的SyncManager,这次注册“SyncDict”,而不传入一个可调用的来检索共享的Dict。然后使用自定义的SyncManager,使用端口5000上的环回IP地址(127.0.0.1)和一个建立安全连接的AuthKey进行连接在server.py中启动管理器。它通过调用管理器上注册的可调用项来检索共享的dict syncdict。它提示用户执行以下操作:

    1. 要操作的syncdict中的键
    2. 每一个周期增加键访问的值的量
    3. 每个周期的睡眠时间(秒)

    然后客户机检查密钥是否存在。如果没有,它会在syncdict上创建密钥。然后,客户机进入一个“无止境”循环,在其中按增量更新键的值,休眠指定的量,并打印syncdict,只为了重复此过程,直到发生键盘中断(ctrl+c)。

    恼人的问题

    1. 必须在启动管理器之前调用管理器的register方法,否则即使对管理器的dir调用将显示它确实具有已注册的方法,也会出现异常。
    2. 对dict的所有操作都必须使用方法而不是dict赋值(syncdict[“blast”]=2将由于多处理共享自定义对象的方式而失败)
    3. 使用SyncManager的dict方法可以缓解恼人的问题2,但恼人的问题1会阻止SyncManager.dict()返回的代理被注册和共享。(syncmanager.dict()只能在启动管理器之后调用,register只能在启动管理器之前工作,因此syncmanager.dict()仅在执行函数式编程并将代理作为参数传递给进程(如文档示例所示)时才有用)
    4. 服务器和客户机都必须注册,尽管从直观上看,客户机在连接到管理器之后就可以找到它(请将此添加到您的愿望列表多处理开发人员中)

    关闭

    我希望你和我一样喜欢这个相当透彻和略显费时的回答。我一直很难弄清楚为什么我要在多处理模块上挣扎这么久,而pyro让它变得轻而易举,现在多亏了这个答案,我已经一针见血了。我希望这对python社区在如何改进多处理模块方面是有用的,因为我相信它有很大的希望,但是在它的初级阶段还没有实现。尽管有这些恼人的问题,我认为这仍然是一个可行的选择,而且非常简单。您还可以使用syncmanager.dict()并将其作为参数传递给进程,就像文档显示的那样,根据您的需求,它可能是一个更简单的解决方案,但对我来说这感觉不自然。

        2
  •  4
  •   Alex Martelli    14 年前

    我将用一个单独的过程来维护“共享dict”:只需使用例如。 xmlrpclib 使少量代码可供其他进程使用,通过xmlrpclib公开,例如一个函数 key, increment 要执行增量,只需 key 并根据应用程序的需要返回值和语义详细信息(是否有丢失密钥等的默认值)。

    然后,您可以使用任何您喜欢的方法来实现共享dict专用进程:从内存中有一个简单dict的单线程服务器到一个简单的sqlite db等等,我建议您从“尽可能简单”的代码开始(取决于您是否需要 持久的 共享dict,或者持久性对您来说不是必需的),然后根据需要进行度量和优化。

        3
  •  4
  •   Frank V    14 年前

    以响应并发写问题的适当解决方案。我做了很快的调查发现 this article 建议使用锁/信号量解决方案。( http://effbot.org/zone/thread-synchronization.htm )

    虽然这个例子对字典没有特殊性,但我确信您可以编写一个基于类的包装器对象来帮助您基于这个想法使用字典。

    如果我需要以线程安全的方式实现类似的东西,我可能会使用python信号量解决方案。(假设我以前的合并技术不起作用)我认为信号量通常会由于其阻塞特性而降低线程效率。

    从站点:

    信号量是一种更高级的锁机制。一个信号量有一个内部计数器而不是一个锁标志,它只在超过给定数量的线程试图保持该信号量时才会阻塞。根据信号量的初始化方式,这允许多个线程同时访问同一代码段。

    semaphore = threading.BoundedSemaphore()
    semaphore.acquire() # decrements the counter
    ... access the shared resource; work with dictionary, add item or whatever.
    semaphore.release() # increments the counter
    
        4
  •  3
  •   Frank V    14 年前

    字典首先需要共享是有原因的吗?您是否可以让每个线程维护自己的字典实例,并在线程处理结束时合并,或者定期使用回调将各个线程字典的副本合并在一起?

    我不知道你到底在做什么,所以请记住,我的书面计划可能不会一字不差。我建议的是更高层次的设计理念。