代码之家  ›  专栏  ›  技术社区  ›  Frederico Schardong

使用numpy和多进程的怪异行为

  •  1
  • Frederico Schardong  · 技术社区  · 6 年前

    对不起,代码太长了,我试着让它尽可能简单,但是可以重复。

    简言之,这个python脚本启动了四个进程,这些进程将数字随机分配到列表中。然后,将结果添加到 multiprocessing.Queue() .

    import random
    import multiprocessing
    import numpy
    import sys
    
    def work(subarray, queue):
        result = [numpy.array([], dtype=numpy.uint64) for i in range (0, 4)]
    
        for element in numpy.nditer(subarray):
            index = random.randint(0, 3)
            result[index] = numpy.append(result[index], element)
    
        queue.put(result)
        print "after the queue.put"
    
    jobs = []
    queue = multiprocessing.Queue()
    
    subarray = numpy.array_split(numpy.arange(1, 10001, dtype=numpy.uint64), 4)
    
    for i in range(0, 4):
        process = multiprocessing.Process(target=work, args=(subarray[i], queue))
        jobs.append(process)
        process.start()
    
    for j in jobs:
        j.join()
    
    print "the end"
    

    所有进程都运行 print "after the queue.put" 行。然而,它并没有达到 print "the end" 行。很奇怪,如果我换了 arange 10001 1001

    3 回复  |  直到 6 年前
        1
  •  1
  •   hmad    6 年前

    大多数子进程在put调用时阻塞。 multiprocessing queue put

    必要时阻塞,直到有空闲插槽可用。

    这可以通过在加入之前添加对queue.get()的调用来避免。

    if __name__ == '__main__':
        # main code here
    

    Compulsory usage of if name ==“ main ” in windows while using multiprocessing

        2
  •  1
  •   Jannick    6 年前

    我将把我的评论扩展成一个简短的回答。因为我也不明白这种奇怪的行为,这仅仅是一种权宜之计。

    第一个观察结果是,如果queue.put行被注释掉,则代码将一直运行到末尾,因此这一定是与队列相关的问题。结果实际上被添加到队列中,因此问题必须是队列和连接之间的相互作用。

    import random
    import multiprocessing
    import numpy
    import sys
    import time
    
    def work(subarray, queue):
        result = [numpy.array([], dtype=numpy.uint64) for i in range (4)]
    
        for element in numpy.nditer(subarray):
            index = random.randint(0, 3)
            result[index] = numpy.append(result[index], element)
    
        queue.put(result)
        print("after the queue.put")
    
    
    jobs = []
    queue = multiprocessing.Queue()
    
    subarray = numpy.array_split(numpy.arange(1, 15001, dtype=numpy.uint64), 4)
    
    
    for i in range(4):
        process = multiprocessing.Process(target=work, args=(subarray[i], queue))
        jobs.append(process)
        process.start()
    
    res = []
    while len(res)<4:
        res.append(queue.get())
    
    print("the end")
    
        3
  •  1
  •   hmad    6 年前

    这就是原因:

    Joining processes that use queues

    请记住,将项放入队列的进程将在终止之前等待,直到所有缓冲项都由feeder线程馈送到底层管道。(子进程可以调用队列的cancel_join_thread()方法来避免这种行为。)

    这意味着,无论何时使用队列,都需要确保在加入进程之前,已放入队列的所有项最终都将被删除。否则,您无法确保已将项放入队列的进程将终止。还要记住,非守护进程将自动加入。