我正在更新我的一个Python包,以便它是异步的(使用aiohttp而不是requests)。我还在更新我的单元测试,以便它们可以与新的异步版本一起工作,但我在这方面遇到了一些问题。
以下是我的包中的一段代码:
async def fetch(session, url):
while True:
try:
async with session.get(url) as response:
assert response.status == 200
return await response.json()
except Exception as error:
pass
class FPL():
def __init__(self, session):
self.session = session
async def get_user(self, user_id, return_json=False):
url = API_URLS["user"].format(user_id)
user = await fetch(self.session, url)
if return_json:
return user
return User(user, session=self.session)当这样使用时,所有这些似乎都能正常工作:
async def main():
async with aiohttp.ClientSession() as session:
fpl = FPL(session)
user = await fpl.get_user(3808385)
print(user)
loop = asynio.get_event_loop()
loop.run_until_complete(main())
>>> User 3808385不幸的是,我的单元测试遇到了一些问题。我想我可以简单地做一些事情,比如
def _run(coroutine):
return asyncio.get_event_loop().run_until_complete(coroutine)
class FPLTest(unittest.TestCase):
def setUp(self):
session = aiohttp.ClientSession()
self.fpl = FPL(session)
def test_user(self):
user = _run(self.fpl.get_user("3523615"))
self.assertIsInstance(user, User)
user = _run(self.fpl.get_user("3523615", True))
self.assertIsInstance(user, dict)
if __name__ == '__main__':
unittest.main()它会给出如下错误
DeprecationWarning: The object should be created from async function loop=loop)和
ResourceWarning: Unclosed client session <aiohttp.client.ClientSession object at 0x7fbe647fd208>我曾尝试向关闭会话的FPL类添加一个_close()函数,然后从测试中调用它,但这也不起作用,仍然显示有一个未关闭的客户端会话。
有没有可能做到这一点,我只是做错了什么,还是使用asynctest或pytest-aiohttp之类的东西更好?
编辑:我还查看了aiohttp的文档,发现了一个example,展示了如何使用标准库的单元测试来测试应用程序。不幸的是,我无法让它工作,因为AioHTTPTestCase中提供的loop从3.5开始就被弃用了,并抛出了一个错误:
class FPLTest(AioHTTPTestCase):
def setUp(self):
session = aiohttp.ClientSession()
self.fpl = FPL(session)
@unittest_run_loop
async def test_user(self):
user = await self.fpl.get_user("3523615")
self.assertIsInstance(user, User)
user = await self.fpl.get_user("3523615", True)
self.assertIsInstance(user, dict)给出
tests/test_fpl.py:20: DeprecationWarning: The object should be created from async function
session = aiohttp.ClientSession()
...
======================================================================
ERROR: test_user (__main__.FPLTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/amos/Documents/fpl/venv/lib/python3.7/site-packages/aiohttp/test_utils.py", line 477, in new_func
return self.loop.run_until_complete(
AttributeError: 'FPLTest' object has no attribute 'loop'
======================================================================
ERROR: test_user (__main__.FPLTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/amos/Documents/fpl/venv/lib/python3.7/site-packages/aiohttp/test_utils.py", line 451, in tearDown
self.loop.run_until_complete(self.tearDownAsync())
AttributeError: 'FPLTest' object has no attribute 'loop'发布于 2019-01-21 04:52:18
将pytest与aiohttp-pytest一起使用
async def test_test_user(loop):
async with aiohttp.ClientSession() as session:
fpl = FPL(session)
user = await fpl.get_user(3808385)
assert isinstance(user, User)现代python开发人员的一句格言:生命太短暂,不能不使用pytest。
您可能还想设置一个模拟服务器来在测试期间接收您的http请求,我没有一个简单的示例,但是可以在here上看到一个完整的工作示例。
https://stackoverflow.com/questions/54278925
复制相似问题