是否可以将执行从函数返回到event loop。并且一旦Task完成,返回到函数并继续执行?
我正在尝试使用pytest-asyncio插件
示例:
@pytest.mark.asyncio
async def test_async1(event_loop):
print('start 1')
res = event_loop.create_task(
send_async_request("http://test.com", limit=1000))) # here I need to return execution to event loop and continue only after getting response from send_async_request function
print('end1',res)
@pytest.mark.asyncio
async def test_async2(event_loop):
print('start 2')
res = event_loop.create_task(
send_async_request("http://test2", limit=1000))) # here I need to return execution to event loop and continue only after getting response from send_async_request function
print('end2', res)send_async_request- aiohttp
@asyncio.coroutine
def send_async_request(url, method='GET'):
with aiohttp.ClientSession() as session:
resp = yield from session.get(url, timeout=60)
if resp.status == 200:
return resp.status, response
else:
return resp.status, False发布于 2017-07-31 08:38:51
当您的测试被标记为pytest.mark.asyncio时,它们就变成了协同,所以您可以使用await语法:
@pytest.mark.asyncio
async def test_sleep(event_loop):
result = await asyncio.sleep(1, result=3, loop=event_loop)
assert result == 3编辑:另一个例子,多个sleep操作:
@pytest.mark.asyncio
async def test_multiple_sleep(event_loop):
tasks = [event_loop.create_task(asyncio.sleep(1, result=x))
for x in range(10)]
results = await asyncio.gather(*tasks)
assert results == list(range(10))https://stackoverflow.com/questions/45410434
复制相似问题