代码之家  ›  专栏  ›  技术社区  ›  hyphen

Python Asyncio-RuntimeError:无法关闭正在运行的事件循环

  •  3
  • hyphen  · 技术社区  · 6 年前

    我正在尝试解决此错误: RuntimeError: Cannot close a running event loop

     def start_job(self):
    
            if self.auth_expire_timestamp < get_timestamp():
                api_obj = api_handler.Api('Api Name', self.dbObj)
                self.api_auth_resp = api_obj.get_auth_response()
                self.api_attr = api_obj.get_attributes()
    
    
            try:
                self.queue_manager(self.do_stuff(json_data))
            except aiohttp.ServerDisconnectedError as e:
                logging.info("Reconnecting...")
                api_obj = api_handler.Api('API Name', self.dbObj)
                self.api_auth_resp = api_obj.get_auth_response()
                self.api_attr = api_obj.get_attributes()
                self.run_eligibility()
    
    async def do_stuff(self, data):
    
        tasks = []
    
        async with aiohttp.ClientSession() as session:
            for row in data:
                task = asyncio.ensure_future(self.async_post('url', session, row))
                tasks.append(task)
            result = await asyncio.gather(*tasks)
        self.load_results(result)
    
    
    def queue_manager(self, method):
        self.loop = asyncio.get_event_loop()
        future = asyncio.ensure_future(method)
        self.loop.run_until_complete(future)
    
    
    async def async_post(self, resource, session, data):
            async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
                resp = []
                try:
                    headers = response.headers['foo']
                    content = await response.read()
                    resp.append(headers)
                    resp.append(content)
                except KeyError as e:
                    logging.error('KeyError at async_post response')
                    logging.error(e)
            return resp
    
    
    def shutdown(self):
        //need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one.
        self.loop.close() 
        return True
    

    如何处理错误并正确关闭事件循环,以便可以启动一个新的事件循环并重新启动整个程序并继续。

    编辑:

    这就是我现在正在尝试的,基于 this SO answer queue_manager 我把它改成了:

    try:
       self.loop.run_until_complete(future)
    except Exception as e:
       future.cancel()
       self.loop.run_until_complete(future)
       future.exception()
    

    更新:

    shutdown() 方法并将此添加到我的 queue_manager() 取而代之的是方法,它似乎毫无问题地工作:

      try:
            self.loop.run_until_complete(future)
        except Exception as e:
            future.cancel()
            self.check_in_records()
            self.reconnect()
            self.start_job()
            future.exception()
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   user4815162342    6 年前

    要回答原来所说的问题,没有必要 close()

    根据更新中的代码 queue_manager 可能是这样的:

    try:
        self.loop.run_until_complete(future)
    except Exception as e:
        self.check_in_records()
        self.reconnect()
        self.start_job()
    

    取消 future 没有必要而且据我所知没有效果。这和 referenced answer 特别是对 KeyboardInterrupt ,特别是因为它是由asyncio本身引发的。 键盘中断 可以通过 run_until_complete 没有真正完成的未来。处理 Ctrl-C键 正确地使用异步是非常困难的,甚至是不可能的(请参见 here Ctrl-C键 键盘中断 Exception ,所以如果 Ctrl-C键 唯一的尸体甚至都不会执行。)

    这是一个正确的想法,但是(更新的)问题中的代码只是取消一个未来,一个已经传递给 . 回想一下,future是稍后将提供的结果值的占位符。一旦提供了值,就可以通过调用 future.result() . 如果未来的“价值”是个例外, 未来。结果() 会引发这个例外。 运行直至完成 有一个契约,它将运行事件循环,只要给定的未来生成一个值,然后它返回该值。如果“值”实际上是要引发的异常,那么 会重新提起的。例如:

    loop = asyncio.get_event_loop()
    fut = loop.create_future()
    loop.call_soon(fut.set_exception, ZeroDivisionError)
    # raises ZeroDivisionError, as that is the future's result,
    # manually set
    loop.run_until_complete(fut)
    

    Task ,一个特定于异步的对象,它将协程包装为 Future 运行直至完成 :

    async def fail():
        1/0
    
    loop = asyncio.get_event_loop()
    fut = loop.create_task(fail())
    # raises ZeroDivisionError, as that is the future's result,
    # because the coroutine raises it
    loop.run_until_complete(fut)
    

    运行直至完成 finishing意味着coroutine也完成了,返回了一个值或引发了一个异常,由 返回或提高。

    await 把它吊起来的表情 CancelledError . 除非任务明确捕获并禁止此异常(行为良好的异步代码不应执行此异常),否则任务将停止执行,并且 取消错误 将成为它的结果。但是,如果在 cancel() 是的,那么 取消() 无法执行任何操作,因为没有挂起 等待 注射 取消错误 进入。