代码之家  ›  专栏  ›  技术社区  ›  pstatix

在异步程序中向文件写入web响应

  •  1
  • pstatix  · 技术社区  · 6 年前

    ThreadPoolExecutors 所有异步调用都使用 asyncio aiohttp . 由于网络调用是非阻塞的IO,所以大部分转换都是直接的,这是保存响应使我陷入了一个难题。

    我使用的所有示例,甚至两个库的文档,都使用 asyncio.gather()

    解决这个问题的合适方法是什么?要用吗 asyncio.as_completed() 然后:

    for f in as_completed(aws):
        earliest_result = await f
        # Assumes `loop` defined under `if __name__` block outside coroutine
        loop = get_event_loop()
        # Run the blocking IO in an exectuor and write to file
        _ = await loop.run_in_executor(None, save_result, earliest_result)
    

    这是不是引入了一个线程(假设我使用 ThreadPoolExecutor 在默认情况下)因此使它成为一个异步的、多线程的程序,而不是一个异步的、单线程的程序?

    而且,这能保证只有1个 earliest_result await loop.run_in_executor(...) 要运行,则会出现另一个结果,我尝试运行到同一个文件;我可以使用信号量进行限制。

    0 回复  |  直到 5 年前
        1
  •  1
  •   merrydeath    5 年前

    我建议你利用 Streaming API . 将响应直接写入磁盘而不是RAM,并从gather返回文件名而不是响应本身。这样做根本不会占用太多内存。这是我意思的一个小演示:

    import asyncio
    
    import aiofiles
    from aiohttp import ClientSession
    
    
    async def make_request(session, url):
        response = await session.request(method="GET", url=url)
        filename = url.split('/')[-1]
        async for data in response.content.iter_chunked(1024):
            async with aiofiles.open(filename, "ba") as f:
                await f.write(data)
        return filename
    
    
    async def main():
        urls = ['https://github.com/Tinche/aiofiles',
                'https://github.com/aio-libs/aiohttp']
        async with ClientSession() as session:
            coros = [make_request(session, url) for url in urls]
            result_files = await asyncio.gather(*coros)
        print(result_files)
    
    
    asyncio.run(main())
    
        2
  •  0
  •   Mikhail Gerasimov    6 年前

    在我的例子中,这些结果可以是许多GB范围内的文件,我不想将它们存储在内存中。

    如果我是对的,在你的密码里 aws 意味着下载单个文件时,可能会遇到以下问题:while as_completed 允许尽快将数据从RAM交换到HDD 自动气象站

    为了避免这种情况,您需要使用信号量来确保没有多少文件是并行下载的,从而防止RAM过度使用。

    下面是使用 semaphore .

    这是否引入了一个线程(假设我使用一个ThreadPoolExecutor 或者是异步的单线程程序?

    我不确定,我理解你的问题,但是是的,你的代码将使用线程,但是 save_result

    此外,这是否确保只向1个最早的结果写入

    是的,是[*]。准确地说是关键词 await 代码片段的最后一行将确保:

    _ = await loop.run_in_executor(None, save_result, earliest_result)
    

    你可以读为:“开始执行 run_in_executor 异步并挂起此行的执行流,直到 完成并返回结果”。


    [*]是的,如果你不为 f in as_completed(aws) 循环首先是平行的。