代码之家  ›  专栏  ›  技术社区  ›  Chris Seymour

将Asyncio与多工作进程池执行器组合

  •  4
  • Chris Seymour  · 技术社区  · 6 年前

    是否可以采用如下阻塞函数 work 并让它在 ProcessPoolExecutor 不止一个工人?

    import asyncio
    from time import sleep, time
    from concurrent.futures import ProcessPoolExecutor
    
    num_jobs = 4
    queue = asyncio.Queue()
    executor = ProcessPoolExecutor(max_workers=num_jobs)
    loop = asyncio.get_event_loop()
    
    def work():
        sleep(1)
    
    async def producer():
        for i in range(num_jobs):
            results = await loop.run_in_executor(executor, work)
            await queue.put(results)
    
    async def consumer():
        completed = 0
        while completed < num_jobs:
            job = await queue.get()
            completed += 1
    
    s = time()
    loop.run_until_complete(asyncio.gather(producer(), consumer()))
    print("duration", time() - s)
    

    在具有4个以上内核的机器上运行上述操作需要大约4秒。你会怎么写 producer 上面的例子只需要~1秒?

    2 回复  |  直到 6 年前
        1
  •  6
  •   vaultah    6 年前

    await loop.run_in_executor(executor, work) 阻止循环直到 work 完成,因此一次只能运行一个函数。

    要同时运行作业,可以使用 asyncio.as_completed 以下内容:

    async def producer():
        tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
        for f in asyncio.as_completed(tasks, loop=loop):
            results = await f
            await queue.put(results)
    
        2
  •  2
  •   user4815162342    6 年前

    问题在于 producer .它不允许作业在后台运行,而是等待每个作业完成,从而序列化它们。如果你重写 生产者 像这样(然后离开 consumer 不变),您将获得预期的1s持续时间:

    async def producer():
        for i in range(num_jobs):
            fut = loop.run_in_executor(executor, work)
            fut.add_done_callback(lambda f: queue.put_nowait(f.result()))