代码之家  ›  专栏  ›  技术社区  ›  Michael Fulton

异步IO。start\u服务器未在pytest asyncio测试内运行

  •  0
  • Michael Fulton  · 技术社区  · 6 年前

    使用测试服务器程序时 asyncio.start_server() 此方法似乎没有调用第一个 client_connected_cb 方法当程序在测试之外独立运行时,它会按预期工作。我怀疑这与未正确使用事件循环有关。

    为了测试,我有一些非异步代码连接到套接字服务器,该服务器在侦听套接字时无限期挂起。

    为什么事件循环不在单元测试中处理服务器代码?

    测试:

    @pytest.mark.asyncio
    async def test_connect(server, connection):
        await server.start()
        connection.connect()
        connection.join_lobby('John')
        assert server.lobby.user_count() == 1
    

    服务器:

    async def start(self):
        loop = asyncio.get_event_loop()
        coro = asyncio.start_server(self.new_client, self.HOST, self.PORT, loop = loop)
        self.server_coro = await loop.create_task(coro)
        print('Serving on {}'.format(self.server_coro.sockets[0].getsockname()))
    

    连接代码:

    def connect(self):
        self.s.connect((self.HOST, self.PORT))
        print('Just connected')
    
    def join_lobby(self, name):
        self.s.send(b'Some data')
        print('Just tried to join lobby')
        msg = self.s.recv(BUFFER_SIZE)
        print(msg)
    

    注释掉s.recv行时,将显示以下输出。如果未对s.recv进行注释,测试将挂起(因为服务器从不发送数据):

    Serving on ('0.0.0.0', 8888)
    Just connected
    Just tried to join lobby
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   altendky    6 年前

    你不能这样混合使用阻塞代码和非阻塞代码。recv()调用正在阻止整个线程,并阻止异步服务器运行。您还必须使客户端异步。