代码之家  ›  专栏  ›  技术社区  ›  I159

使用ProcessPoolExecutor的异步IO运行执行器

  •  1
  • I159  · 技术社区  · 6 年前

    我尝试使用以下命令组合阻塞任务和非阻塞(I/O绑定)任务 ProcessPoolExecutor 发现自己的行为出乎意料。

    class BlockingQueueListener(BaseBlockingListener):
        def run(self):
            # Continioulsy listening a queue
            blocking_listen()
    
    class NonBlockingListener(BaseNonBlocking):
        def non_blocking_listen(self):
            while True:
               await self.get_message()
    
    
    def run(blocking):
        blocking.run()
    
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        executor = ProcessPoolExecutor()
        blocking = BlockingQueueListener()
        non_blocking = NonBlockingListener()
        future = loop.run_in_executor(executor, run(blocking))
        loop.run_until_complete(
            asyncio.gather(
                non_blocking.main(),
                future
            )
        )
    

    我原以为这两个任务将同时具有控制权,但阻止任务是在中开始的 ProcessPoolExecutor进程池执行器 阻止和从不返回控制。怎么会这样?在multiprocessing executor中,将常规协同路由和未来相结合的正确方法是什么?

    1 回复  |  直到 6 年前
        1
  •  4
  •   noxdafox    6 年前

    该行:

    future = loop.run_in_executor(executor, run(blocking))
    

    将实际运行阻塞函数并将其结果提供给执行器。

    根据 documentation ,您需要显式地传递函数,后跟其参数。

     future = loop.run_in_executor(executor, run, blocking)