代码之家  ›  专栏  ›  技术社区  ›  kaligne

多处理池和队列

  •  7
  • kaligne  · 技术社区  · 8 年前

    我正在使用多处理池。我需要将一个结构作为参数传递给一个必须在单独进程中使用的函数。我面临的问题是 multiprocessing.Pool ,因为我不能复制 Pool.Queue 也没有 Pool.Array 该结构将在运行中用于记录每个终止过程的结果。这是我的代码:

    import multiprocessing
    from multiprocessing import Process, Manager, Queue, Array
    import itertools
    import time
    
    def do_work(number, out_queue=None):
        if out_queue is not None:
            print "Treated nb ", number
            out_queue.append("Treated nb " + str(number))
        return 0
    
    
    def multi_run_wrapper(iter_values):
        return do_work(*iter_values)
    
    def test_pool():
        # Get the max cpu
        nb_proc = multiprocessing.cpu_count()
    
        pool = multiprocessing.Pool(processes=nb_proc)
        total_tasks = 16
        tasks = range(total_tasks)
    
        out_queue= Queue()  # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function.
        out_array = Array('i', total_tasks)
        iter_values = itertools.izip(tasks, itertools.repeat(out_array))
        results = pool.map_async(multi_run_wrapper, iter_values)
    
        pool.close()
        pool.join()
        print results._value
        while not out_queue.empty():
            print "queue: ", out_queue.get()
        print "out array: \n", out_array
    
    if __name__ == "__main__":
        test_pool()
    

    我需要在分离的进程中启动一个工作进程,并将输出队列作为参数传递。我还想指定包含有限数量运行进程的池。为此,我使用 pool.map_async() 作用不幸的是,上面的代码给了我一个错误:

    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
        self.run()
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
        self.__target(*self.__args, **self.__kwargs)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
        put(task)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
        assert_spawning(self)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
        ' through inheritance' % type(self).__name__
    RuntimeError: Queue objects should only be shared between processes through inheritance
    

    我相信这是因为 Queue 正如我在文件中看到的那样,永远不能复制。 然后我想让队列成为一个全局变量,这样我就不需要再传递它了,但在我看来这会很混乱。我还想过使用 multiprocessing.Array 相反

    out_array = Array('i', total_tasks)
    

    但与队列一样,也会出现同样的错误:

    # ...
    RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
    

    我需要使用这个特性-使用多处理和从子进程交换信息- 我希望我的代码保持干净整洁。

    我如何才能优雅地将队列传递给我的员工?

    当然,任何其他处理主要规范的方式都是受欢迎的。

    1 回复  |  直到 8 年前
        1
  •  16
  •   Community Egal    7 年前

    multiprocessing.Pool 不会接受 multiprocessing.Queue 作为其工作队列中的参数。我相信这是因为它在内部使用队列将数据来回发送到工作进程。有几个解决方法:

    1) 你真的需要使用队列吗?一个优点是 Pool 函数是将它们的返回值发送回主进程。通常,迭代池中的返回值比使用单独的队列要好。这也避免了通过检查引入的竞争条件 queue.empty()

    2) 如果必须使用 Queue ,您可以使用 multiprocessing.Manager 。这是共享队列的代理,可以作为参数传递给 水塘 功能。

    3) 你可以通过普通考试 队列 通过使用 initializer 创建时 水塘 (喜欢 https://stackoverflow.com/a/3843313 ). 这有点古怪。

    比赛条件 我上面提到的来自:

    while not out_queue.empty():
        print "queue: ", out_queue.get()
    

    当您有工作进程填充队列时,您可能会遇到队列当前为空的情况,因为工作进程将要向队列中放入内容。如果你检查 .empty() 这时候你会早点结束。更好的方法是 前哨的 队列中的值,以在完成向队列中放入数据时发出信号。