代码之家  ›  专栏  ›  技术社区  ›  skimobear

asyncio.create_connection为每次调用实例化一个ThreadPoolExecuter

  •  0
  • skimobear  · 技术社区  · 5 年前

    下面的tcp客户机代码基本上是循环的,直到它能够与tcp服务器建立连接为止。在vs代码中调试时,我注意到每次调用create_connection()似乎都会创建一个新的ThreadPoolExecutor实例。当我的tcp服务器没有运行时,这可能会造成一个问题,因为似乎会创建无限数量的实例。当服务器不运行时,处理多次调用create_connection()的正确方法是什么?

    enter image description here

    async def do_tcp_connect(host, port, queue, shutdown_event, callbacks, connection_list):
    
        logger = logging.getLogger('do_tcp_connect')
        logger.debug('do_tcp_connect(): started')
    
        while True:
            try:
                transport, protocol = await asyncio.get_running_loop().create_connection(lambda: asyncio_callback(host, port, queue, shutdown_event, callbacks, connection_list), host, port)
                return
            except (KeyboardInterrupt, SystemExit):
                raise
            except asyncio.CancelledError as ex:
                logger.error("network_queue_consumer(): CancelledError: A asyncio coroutine task was cancelled, error={}".format(ex))
            except AttributeError as ex:
                logger.error("network_queue_consumer(): AttributeError: An attribute reference or assignment has failed, error:{}".format(ex))
            except OSError as ex:
                logger.error("network_queue_consumer(): OSError error:{}".format(ex))
            except asyncio.TimeoutError:
                continue
            except Exception as ex:
                logger.error('network_queue_consumer(): Exception occurred, error={}'.format(ex))
            except:
                logger.error("network_queue_consumer(): Unexpected error: {}".format(sys.exc_info()[0]))
                raise
    
            await asyncio.sleep(5.0)
    
        logger.debug('do_tcp_connect(): completed')
    
    0 回复  |  直到 5 年前
        1
  •  0
  •   skimobear    5 年前

    我查看了异步源代码,发现ThreadPoolExecutor正在下面的方法中实例化。调用循环。set_default_executor(executor)似乎解决了我遇到的问题。

    asyncio:base_events.py:742
        def run_in_executor(self, executor, func, *args):
            self._check_closed()
            if self._debug:
                self._check_callback(func, 'run_in_executor')
            if executor is None:
                executor = self._default_executor
                if executor is None:
                    executor = concurrent.futures.ThreadPoolExecutor()
                    self._default_executor = executor
            return futures.wrap_future(
                executor.submit(func, *args), loop=self)