代码之家  ›  专栏  ›  技术社区  ›  nalzok granmirupa

对面向队列的函数使用多处理后没有性能提高

  •  1
  • nalzok granmirupa  · 技术社区  · 6 年前

    def enumerate_paths(n, k):
        """
        John want to go up a flight of stairs that has N steps. He can take
        up to K steps each time. This function enumerate all different ways
        he can go up this flight of stairs.
        """
        paths = []
        to_analyze = [(0,)]
    
        while to_analyze:
            path = to_analyze.pop()
            last_step = path[-1]
    
            if last_step >= n:
                # John has reach the top
                paths.append(path)
                continue
    
            for i in range(1, k + 1):
                # possible paths from this point
                extended_path = path + (last_step + i,)
                to_analyze.append(extended_path)
    
        return paths
    

    输出结果如下

    >>> enumerate_paths(3, 2)
    [(0, 2, 4), (0, 2, 3), (0, 1, 3), (0, 1, 2, 4), (0, 1, 2, 3)]
    

    你可能会发现结果令人困惑,所以这里有一个解释。例如, (0, 1, 2, 4)

    我试着融入 multiprocessing

    import multiprocessing
    
    def enumerate_paths_worker(n, k, queue):
        paths = []
    
        while not queue.empty():
            path = queue.get()
            last_step = path[-1]
    
            if last_step >= n:
                # John has reach the top
                paths.append(path)
                continue
    
            for i in range(1, k + 1):
                # possible paths from this point
                extended_path = path + (last_step + i,)
                queue.put(extended_path)
    
        return paths
    
    
    def enumerate_paths(n, k):
        pool = multiprocessing.Pool()
        manager = multiprocessing.Manager()
        queue = manager.Queue()
    
        path_init = (0,)
        queue.put(path_init)
        apply_result = pool.apply_async(enumerate_paths_worker, (n, k, queue))
    
        return apply_result.get()
    

    Python列表 to_analysis 就像一个任务队列,队列中的每一项都可以单独处理,所以我认为这个函数有潜力通过使用多线程/处理进行优化。另外,请注意物品的顺序并不重要。实际上,在优化它时,可以返回Python集、Numpy数组或Pandas数据帧,只要它们表示相同的路径集。

    :对于这样的任务,使用科学软件包(如Numpy、Pandas或Scipy)可以获得多少性能?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Darkonaut    6 年前

    太长,读不下去了


    你的尝试 apply_async 实际上只使用了你池中的一个工人,这就是为什么你看不出有什么不同。

    但是正如在简介中已经说过的,如果计算量足够大,足以收回进程间通信(和进程创建)的开销,那么您的计算只会从多处理中受益。

    我下面的解决方案用于一般问题 JoinableQueue busy_foo 使计算更重,以显示多处理有其优点的情况。

    from multiprocessing import Process
    from multiprocessing import JoinableQueue as Queue
    import time
    
    SENTINEL = 'SENTINEL'
    
    def busy_foo(x = 10e6):
        for _ in range(int(x)):
            x -= 1
    
    
    def enumerate_paths(q_analyze, q_result, n, k):
        """
        John want to go up a flight of stairs that has N steps. He can take
        up to K steps each time. This function enumerate all different ways
        he can go up this flight of stairs.
        """
        for path in iter(q_analyze.get, SENTINEL):
            last_step = path[-1]
    
            if last_step >= n:
                busy_foo()
                # John has reach the top
                q_result.put(path)
                q_analyze.task_done()
                continue
            else:
                busy_foo()
                for i in range(1, k + 1):
                    # possible paths from this point
                    extended_path = path + (last_step + i,)
                    q_analyze.put(extended_path)
                q_analyze.task_done()
    
    
    if __name__ == '__main__':
    
        N_CORES = 4
    
        N = 6
        K = 2
    
        start = time.perf_counter()
        q_analyze = Queue()
        q_result = Queue()
    
        q_analyze.put((0,))
    
        pool = []
        for _ in range(N_CORES):
            pool.append(
                Process(target=enumerate_paths, args=(q_analyze, q_result, N, K))
            )
    
        for p in pool:
            p.start()
    
        q_analyze.join() # block until everything is processed
    
        for p in pool:
            q_analyze.put(SENTINEL)  # let the processes exit gracefully
    
        results = []
        while not q_result.empty():
            results.append(q_result.get())
    
        for p in pool:
            p.join()
    
        print(f'elapsed: {time.perf_counter() - start: .2f} s')
    

    结果

    如果我用上面的代码 忙嫒福 注释掉,N=30,K=2(2178309个结果):

    • N\u芯=4
    • 顺序原稿

    酸洗和拆线,线程运行在锁上等等,造成了这种巨大的差异。

    • 6.45秒
    • 30.46秒

    这里的计算非常繁重,足以让开销赚回来。

    努比