使用测试服务器程序时
asyncio.start_server()
此方法似乎没有调用第一个
client_connected_cb
方法当程序在测试之外独立运行时,它会按预期工作。我怀疑这与未正确使用事件循环有关。
为了测试,我有一些非异步代码连接到套接字服务器,该服务器在侦听套接字时无限期挂起。
为什么事件循环不在单元测试中处理服务器代码?
测试:
@pytest.mark.asyncio
async def test_connect(server, connection):
await server.start()
connection.connect()
connection.join_lobby('John')
assert server.lobby.user_count() == 1
服务器:
async def start(self):
loop = asyncio.get_event_loop()
coro = asyncio.start_server(self.new_client, self.HOST, self.PORT, loop = loop)
self.server_coro = await loop.create_task(coro)
print('Serving on {}'.format(self.server_coro.sockets[0].getsockname()))
连接代码:
def connect(self):
self.s.connect((self.HOST, self.PORT))
print('Just connected')
def join_lobby(self, name):
self.s.send(b'Some data')
print('Just tried to join lobby')
msg = self.s.recv(BUFFER_SIZE)
print(msg)
注释掉s.recv行时,将显示以下输出。如果未对s.recv进行注释,测试将挂起(因为服务器从不发送数据):
Serving on ('0.0.0.0', 8888)
Just connected
Just tried to join lobby