代码之家  ›  专栏  ›  技术社区  ›  barciewicz

如何使包含for循环非阻塞的函数?

  •  4
  • barciewicz  · 技术社区  · 6 年前

    我正在尝试使以下代码异步:

    import asyncio
    import random
    
    async def count():
        l = []
        for i in range(10000000):
            l.append(i)
        return random.choice(l)
    
    async def long_task1():
        print('Starting task 1...')
        task_output = await count()
        print('Task 1 output is {}'.format(task_output ))
    
    
    async def long_task2():
        print('Starting task 2...')
        task_output = await count()
        print('Task 2 output is {}'.format(task_output ))
    
    async def main():
        await asyncio.gather(long_task1(), long_task2())
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(main())
    

    目前它将同步工作。

    是因为 count 缺少功能 await 声明?

    我已经尝试过重新编写函数以包括 等待 :

    async def count():
        l = []
        for i in range(10000000):
            l.append(i)
        choice = await random.choice(l)
        return choice
    

    它将异步启动(两者都是 Starting task 1... Starting task 2... 会一个接一个地被打印出来),但是我会得到一个错误:

    类型错误:对象in t不能在“await”表达式中使用

    我知道错误的发生是因为 random.choice(l) 不是一个可以等待的(协同行动),但我不知道如何解决这个问题而不跑在圈里。我是否需要以某种方式将for循环重构为couroutine?

    3 回复  |  直到 6 年前
        1
  •  3
  •   user4815162342    6 年前

    是因为计数函数不存在吗? await 声明?

    简而言之,是的,您已经正确地识别了问题。要并行执行任务,您不仅需要指定 async def ,但也要等待暂停执行的内容,从而将控制返回到事件循环。在异步IO中,通常是一种会阻塞同步程序的调用,例如休眠或从尚未准备好读取的套接字中读取。

    强迫某人 temporary suspension ,您可以添加 await asyncio.sleep(0) 在循环中 count . 添加 等待 在一个普通功能前,如 random.choice ,不起作用,因为 等待 需要一个实现可等待接口的对象,并在代码中 随机选择 只返回一个整数。

        2
  •  1
  •   I Funball    6 年前

    您的代码调用 gather 两者兼而有之 long_task1 long_task2 同时地。然后你叫等待 count 在每个函数中。但这将 await 在该子程序上完成。因此,整个子例程仍将在下一个子例程开始之前完成。您需要一个函数来挂起整个任务。我创造了两种方法来规避这个问题。两者都涉及到创建新任务。

    创建新的子例程:

    async def count():
       l = []
       await asyncio.wait_for(loopProcess(l), timeout=1000000.0)
       return random.choice(l)
    
    async def loopProcess(l):
       for i in range(10000000):
          l.append(i)
    

    你也可以保留你的 计数 功能与原始代码相同,并更改 long_task(1/2) 像这样做 count() 一项新任务:

    async def long_task1():
       print('Starting task 1...')
       task_output = await asyncio.shield(count())
       print('Task 1 output is {}'.format(task_output ))
    
    
    async def long_task2():
       print('Starting task 2...')
       task_output = await asyncio.shield(count())
       print('Task 2 output is {}'.format(task_output ))
    

    您也可以使用 create_task 如果您有python 3.7。

    来源: https://docs.python.org/3/library/asyncio-task.html

        3
  •  1
  •   balki    6 年前

    为了使Asyncio正常工作,事件循环中不应该有任何CPU密集型任务(TightBigfor循环)。因为没有办法离开for循环。如果使用显式 asyncio.sleep 在这个循环中,你只是不必要地进出紧身衣,减慢整个过程。如果您的目标只是看看Asyncio是如何工作的,那就好了。

    但是在现实世界中,如果你有一个CPU密集型的任务,你有两个选择

    1. 使用多进程并将任务委托给其他进程。
    2. 使用释放gil并使用线程的本机代码绑定。

    顾名思义,该库用于异步IO。 async"io"