我试图对异步套接字服务器进行单元测试,并使用pytest-asyncio使pytest与异步代码库兼容。服务器一旦启动,总是要通过while循环发送回复,并且可能会花费大部分时间在client_loop()中等待传入的消息。问题是,在单元测试框架终止事件循环并发出此警告之前,无法取消此任务:
任务已被销毁,但仍未完成! 任务:<任务挂起的coro=< Server.new_client()完成,定义在/.path./server.py:16> wait_for=<未来待决cb=< TaskWakeupMethWrapper对象at 0x106d7cbe8>()>>
我唯一可以访问的任务是asyncio.create_task()创建的任务,它似乎不是相同的任务。该任务如下:
任务:<任务挂起的coro=< start_server()运行在/usr/local/c业力/python/.不同路径./streams.py s.py:86>
因此,在此任务上调用task.cancel(); await task.wait_cancelled()没有任何效果。
如何编写此单元测试,以便为每个测试干净地启动和启动服务器,而不切断可能仍在运行的任务?
下面是一个例子:
test_server.py
import pytest
import asyncio
@pytest.fixture
async def server(event_loop):
from server import Server
the_server = Server()
await the_server.start()
yield the_server
the_server.stop()
@pytest.mark.asyncio
async def test_connect(server):
loop = asyncio.get_event_loop()
reader, writer = await asyncio.open_connection('0.0.0.0', 8888, loop = loop)
writer.write(b'something')
await reader.read(100)
writer.write(b'something else')
await reader.read(100)
assert 1server.py
import asyncio
class Server():
async def start(self):
loop = asyncio.get_event_loop()
coro = asyncio.start_server(self.new_client, '0.0.0.0', 8888, loop = loop)
task = loop.create_task(coro)
print('\n')
print(task)
self.server = await task
def stop(self):
self.server.close()
async def new_client(self, reader, writer):
await self.client_loop(reader, writer)
async def client_loop(self, reader, writer):
while True:
await reader.read(100)
writer.write(b'reply')如果您想运行这个例子,只需运行pip3 install pytest-asyncio和pytest就可以获得这个插件。
发布于 2018-04-01 18:07:41
asyncio.Server.stop()方法没有完全停止服务器。它只是停止接受新的联系。关闭前创建的任何连接将继续执行,直到完成为止。
根据文献资料 (强调地雷)
停止服务:关闭侦听套接字并将套接字属性设置为None。 表示现有传入客户端连接的套接字仍处于打开状态. 服务器异步关闭,使用wait_closed() coroutine等待服务器关闭。
在本例中,所有连接都被发送到无限client_loop方法。
更好的解决方案是在new_client()中创建一个任务集合,负责执行client_loop()逻辑,而不是直接等待方法。使用这种方法,可以在stop()方法中干净地终止所有打开的任务。
发布于 2018-03-23 07:30:06
你必须在打电话给await self.server.wait_closed()后再打电话给self.server.close()。
因此,您的夹具应该如下所示:
@pytest.fixture
async def server(event_loop):
from server import Server
the_server = Server()
await the_server.start()
yield the_server
await the_server.stop()您的stop方法的Server应该如下所示:
async def stop(self):
self.server.close()
await self.server.wait_closed()详情请参见文献资料。
https://stackoverflow.com/questions/49439236
复制相似问题