如果异步任务task_parent创建子任务task_child,但task_parent由于创建task_child后抛出的异常而被取消,那么task_child是否也会自动取消(如果没有asyncio.shield保护)?
例如,在以下代码中:
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await asyncio.gather(t1)
r3 = await process_result(r1) # process_result throws an exception
r2, = await asyncio.gather(t2)
return await process_results(r2, r3)如果process_result(r1)抛出异常,t2是否会被自动取消(并随后回收垃圾)?
如果我没有使用asyncio.gather,而是直接等待一项任务,该怎么办:
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await t1
r3 = await process_result(r1) # process_result throws an exception
r2, = await t2
return await process_results(r2, r3)如果process_result(r1)抛出异常,在这种情况下t2是否也会被自动取消?
发布于 2022-01-19 16:05:47
设法找到了我自己问题的答案。任务取消可以通过结构化并发来实现,而在当前版本的Python (Python3.10)中不支持结构化并发,尽管在提案之后有一个TaskGroups引入了佩普654。
幸运的是,有一个AnyIO库,它在异步之上实现三类结构化并发。我问题中的示例可以在AnyIO中重写为具有可取消的任务:
import asyncio
from anyio import create_memory_object_stream, TASK_STATUS_IGNORED, create_task_group
from contextlib import AsyncExitStack
async def coroutine1(send_stream):
async with send_stream:
await send_stream.send(1)
async def coroutine2(send_stream):
async with send_stream:
await asyncio.sleep(1)
await send_stream.send(2)
async def process_result(receive_stream, send_stream):
async with AsyncExitStack() as stack:
rs = await stack.enter_async_context(receive_stream)
ss = await stack.enter_async_context(send_stream)
res_rs = await rs.receive()
raise Exception
await ss.send(res_rs + 1)
async def process_results(receive_stream_2, receive_stream_3, *, task_status=TASK_STATUS_IGNORED):
task_status.started()
async with AsyncExitStack() as stack:
rs_2 = await stack.enter_async_context(receive_stream_2)
rs_3 = await stack.enter_async_context(receive_stream_3)
res_rs_2 = await rs_2.receive()
res_rs_3 = await rs_3.receive()
return res_rs_2 + res_rs_3
async def f():
async with create_task_group() as tg:
send_stream_1, receive_stream_1 = create_memory_object_stream(1)
tg.start_soon(coroutine1, send_stream_1)
send_stream_2, receive_stream_2 = create_memory_object_stream(1)
tg.start_soon(coroutine2, send_stream_2)
send_stream_3, receive_stream_3 = create_memory_object_stream(1)
tg.start_soon(process_result, receive_stream_1, send_stream_3)
# process_result will raise an Exception which will cancel all tasks in tg group
result = await process_results(receive_stream_2, receive_stream_3)
print(result)
asyncio.run(f())发布于 2022-01-06 05:35:04
简单地说,你所有问题的答案都是“否”。
在异步中,没有父任务与子任务这样的概念,也没有任务之间的任何层次关系。只有一个“级别”--所有任务都是等价的。
在某种程度上,可以通过显式取消最后块中的任务来强制依赖项,例如,
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
try:
r1 = await t1
r3 = await process_result(r1) # process_result throws an exception
r2 = await t2
return await process_results(r2, r3)
finally:
t1.cancel()
t2.cancel()但这不会取消t1和t2碰巧创建的任何任务。它只创建一个依赖级别。
在最近的几个小项目中,我成功地使用这个类将任务组织到层次结构中:
import asyncio
import logging
logger = logging.getLogger(__name__)
class BagContextError(Exception):
pass
class PBag:
def __init__(self):
self.futures = set()
self.exceptions = []
self.done = asyncio.Event()
self.done.set()
self._opened = False
self._closed = False
@property
def is_open(self):
return self._opened and not self._closed
def __await__(self):
yield from self.done.wait().__await__()
async def __aenter__(self):
if self._opened:
raise BagContextError("Already open")
self._opened = True
return self
async def __aexit__(self, exc_type, _exc_value, _traceback):
logger.debug("Bag exit %s %s %s", self.futures, self.exceptions,
exc_type, stack_info=True)
self._closed = True
await self.aclose()
if self.exceptions:
n = 1 if exc_type is None else 0
for ex in self.exceptions[n:]:
try:
raise ex
except Exception:
logging.exception("Suppressed exception")
if exc_type is None:
raise self.exceptions[0]
def until_done(self):
return self.done.wait()
def create_task(self, coro):
if self._closed:
raise BagContextError("Bag closed")
t = asyncio.create_task(coro)
self.add_future(t)
return t
def add_future(self, fut):
if self._closed:
raise BagContextError("Bag closed")
self.futures.add(fut)
fut.add_done_callback(self._future_done)
self.done.clear()
def close(self):
for w in self.futures:
w.cancel()
async def aclose(self):
self.close()
await self.until_done()
def _future_done(self, fut):
try:
self.futures.remove(fut)
except KeyError:
pass
if not self.futures:
self.done.set()
try:
fut.result()
except asyncio.CancelledError:
pass
except Exception as ex:
self.exceptions.append(ex)是一个上下文管理器。在其上下文中,任务是由PBag.create_task而不是asyncio.create_task创建的。对象跟踪它的依赖任务,并在出现异常时关闭它们,如果上下文退出,或者如果调用了close()方法。
如果您使用它构建整个程序,则任务将按层次顺序排列,当最外层的任务被取消时,整个结构将被优雅地展开。如果您只在某些地方使用它,而没有在其他地方使用它(例如,如果您在某些地方编写asyncio.create_task ),则该展开可能不会很好地工作。
我没有太多的经验,所以,当然,可能会有未发现的bug。下面是一个演示程序:
async def main():
async def task1():
print("Task1 started", time.ctime())
await asyncio.sleep(2)
print("Task1 finished", time.ctime())
async def task2():
print("Task2 started", time.ctime())
await asyncio.sleep(3)
raise Exception("Task 2 error")
async def task3(bag):
bag.create_task(task2())
print("Task 3 done")
try:
async with PBag() as bag:
bag.create_task(task1())
bag.create_task(task3(bag))
await bag.until_done()
bag.create_task(task1())
await bag
except asyncio.CancelledError:
traceback.print_exc()
except Exception:
traceback.print_exc()
print("Bag closed", time.ctime())
asyncio.create_task(task1())
print("Program finished", time.ctime())
if __name__ == "__main__":
asyncio.run(main())https://stackoverflow.com/questions/70596654
复制相似问题