代码之家  ›  专栏  ›  技术社区  ›  Andrey Fedorov

在附加到迭代器的同时使用multiprocessing.Pool进行映射的Pythonic方法是什么?

  •  2
  • Andrey Fedorov  · 技术社区  · 5 年前

    使用带有进程池的队列似乎是一种常见的模式,即。

    Pool(2).map(f, xs)
    

    但是f的主体可以附加到被映射的项上,例如。

    from multiprocessing import Pool
    
    xs = [0]
    
    def f(n):
        global xs
        if n < 10:
            xs.append(n + 1)
        return n
    
    Pool(2).map(f, xs)
    

    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    我知道这是有可能的 manually “使用mt提供的原语,但它似乎是一个足够普遍的模式,必须有一个共同的解决方案。你知道吗?

    1 回复  |  直到 5 年前
        1
  •  0
  •   Patol75    5 年前

    根据@martineau的建议,您的代码可以更新为:

    import multiprocessing as mp
    
    
    def f(n, xs, xn):
        if n < 10:
            xn.append(n)
            xs.append(n + 1)
            xn.append(n)
            xs.append(n + 2)
    
    
    if __name__ == '__main__':
        with mp.Manager() as manager:
            xs = manager.list()
            xn = manager.list()
            with mp.Pool(processes=2) as pool:
                pool.starmap(f, [(n, xs, xn) for n in range(20)])
            print(xn)
            print(xs)
    

    这个指纹

    [3, 0, 3, 0, 4, 1, 4, 1, 5, 2, 5, 2, 6, 6, 7, 9, 7, 9, 8, 8]
    [4, 1, 5, 2, 5, 2, 6, 3, 6, 3, 7, 4, 7, 8, 8, 10, 9, 11, 9, 10]
    

    import multiprocessing as mp
    
    
    def f(n):
        thresh = 10
        if max(xs) <= thresh and n < thresh:
            xs.append(n + 1)
    
    
    if __name__ == '__main__':
        with mp.Manager() as manager:
            xs = manager.list([0])
            with mp.Pool(processes=2) as pool:
                pool.map(f, range(20))
            print(sorted(xs))
    

    这张照片

    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
        2
  •  0
  •   Andrey Fedorov    5 年前

    可以创建一个类,该类可以从以下基本体中执行此操作:

    from multiprocessing import JoinableQueue, Process
    
    
    class PoolQueue(object):
        def __init__(self, n):
            self.num_procs = n
    
        def map(self, f, args):
            payloads = JoinableQueue()
            procs = []
    
            def add_task(arg):
                payloads.put(arg)
    
            def process_task():
                while True:
                    pl = payloads.get()
                    f(pl, add_task)
                    payloads.task_done()
    
            for arg in args:
                add_task(arg)
    
            procs = [Process(target=process_task) for _ in range(self.num_procs)]
            for p in procs:
                p.start()
    
            payloads.join()
            for p in procs:
                p.kill()
    

    from time import sleep
    from random import random
    
    
    def pause():
        sleep(random() / 100)
    
    
    def process(payload, add_task):
        print(payload)
        pause()
        if payload:
            add_task(payload[:-1])
        return payload
    
    
    if __name__ == '__main__':
        for x in range(1):
            PoolQueue(2).map(
                process,
                [
                    'abcdefghij',
                    '0123456789',
                    '!@#$%^&*()',
                ],
            )
    

    这里的一个问题是,它会死锁队列增长>32767个任务。这个 gevent.queue.JoinableQueue 处理得更好,但这超出了问题的范围。