首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >子任务的自动取消

子任务的自动取消
EN

Stack Overflow用户
提问于 2022-01-05 16:56:45
回答 2查看 317关注 0票数 0

如果异步任务task_parent创建子任务task_child,但task_parent由于创建task_child后抛出的异常而被取消,那么task_child是否也会自动取消(如果没有asyncio.shield保护)?

例如,在以下代码中:

代码语言:javascript
复制
async def f():
    t1 = asyncio.create_task(coroutine1())
    t2 = asyncio.create_task(coroutine2())
    r1, = await asyncio.gather(t1)
    r3 = await process_result(r1) # process_result throws an exception
    r2, = await asyncio.gather(t2)
    return await process_results(r2, r3)

如果process_result(r1)抛出异常,t2是否会被自动取消(并随后回收垃圾)?

如果我没有使用asyncio.gather,而是直接等待一项任务,该怎么办:

代码语言:javascript
复制
async def f():
    t1 = asyncio.create_task(coroutine1())
    t2 = asyncio.create_task(coroutine2())
    r1, = await t1
    r3 = await process_result(r1) # process_result throws an exception
    r2, = await t2
    return await process_results(r2, r3)

如果process_result(r1)抛出异常,在这种情况下t2是否也会被自动取消?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-01-19 16:05:47

设法找到了我自己问题的答案。任务取消可以通过结构化并发来实现,而在当前版本的Python (Python3.10)中不支持结构化并发,尽管在提案之后有一个TaskGroups引入了佩普654

幸运的是,有一个AnyIO库,它在异步之上实现三类结构化并发。我问题中的示例可以在AnyIO中重写为具有可取消的任务:

代码语言:javascript
复制
import asyncio
from anyio import create_memory_object_stream, TASK_STATUS_IGNORED, create_task_group
from contextlib import AsyncExitStack

async def coroutine1(send_stream):
    async with send_stream:
        await send_stream.send(1)

async def coroutine2(send_stream):
    async with send_stream:
        await asyncio.sleep(1)
        await send_stream.send(2)

async def process_result(receive_stream, send_stream):
    async with AsyncExitStack() as stack:
        rs = await stack.enter_async_context(receive_stream)
        ss = await stack.enter_async_context(send_stream)
        res_rs = await rs.receive()
        raise Exception
        await ss.send(res_rs + 1)

async def process_results(receive_stream_2, receive_stream_3, *, task_status=TASK_STATUS_IGNORED):
    task_status.started()
    async with AsyncExitStack() as stack:
        rs_2 = await stack.enter_async_context(receive_stream_2)
        rs_3 = await stack.enter_async_context(receive_stream_3)
        res_rs_2 = await rs_2.receive()
        res_rs_3 = await rs_3.receive()
        return res_rs_2 + res_rs_3
    

async def f():
    async with create_task_group() as tg:
        send_stream_1, receive_stream_1 = create_memory_object_stream(1)
        tg.start_soon(coroutine1, send_stream_1)
        send_stream_2, receive_stream_2 = create_memory_object_stream(1)
        tg.start_soon(coroutine2, send_stream_2)
        send_stream_3, receive_stream_3 = create_memory_object_stream(1)
        tg.start_soon(process_result, receive_stream_1, send_stream_3)
        # process_result will raise an Exception which will cancel all tasks in tg group
        result = await process_results(receive_stream_2, receive_stream_3)
        print(result)

asyncio.run(f())
票数 1
EN

Stack Overflow用户

发布于 2022-01-06 05:35:04

简单地说,你所有问题的答案都是“否”。

在异步中,没有父任务与子任务这样的概念,也没有任务之间的任何层次关系。只有一个“级别”--所有任务都是等价的。

在某种程度上,可以通过显式取消最后块中的任务来强制依赖项,例如,

代码语言:javascript
复制
async def f():
    t1 = asyncio.create_task(coroutine1())
    t2 = asyncio.create_task(coroutine2())
    try:
        r1 = await t1
        r3 = await process_result(r1) # process_result throws an exception
        r2 = await t2
        return await process_results(r2, r3)
    finally:
        t1.cancel()
        t2.cancel()

但这不会取消t1和t2碰巧创建的任何任务。它只创建一个依赖级别。

在最近的几个小项目中,我成功地使用这个类将任务组织到层次结构中:

代码语言:javascript
复制
import asyncio
import logging

logger = logging.getLogger(__name__)

class BagContextError(Exception):
    pass

class PBag:
    def __init__(self):
        self.futures = set()
        self.exceptions = []
        self.done = asyncio.Event()
        self.done.set()
        self._opened = False
        self._closed = False
        
    @property
    def is_open(self):
        return self._opened and not self._closed
    
    def __await__(self):
        yield from self.done.wait().__await__()
        
    async def __aenter__(self):
        if self._opened:
            raise BagContextError("Already open")
        self._opened = True
        return self
    
    async def __aexit__(self, exc_type, _exc_value, _traceback):
        logger.debug("Bag exit %s %s %s", self.futures, self.exceptions,
                     exc_type, stack_info=True)
        self._closed = True
        await self.aclose()
        if self.exceptions:
            n = 1 if exc_type is None else 0
            for ex in self.exceptions[n:]:
                try:
                    raise ex
                except Exception:
                    logging.exception("Suppressed exception")
            if exc_type is None:
                raise self.exceptions[0]
    
    def until_done(self):
        return self.done.wait()
        
    def create_task(self, coro):
        if self._closed:
            raise BagContextError("Bag closed")
        t = asyncio.create_task(coro)
        self.add_future(t)
        return t
        
    def add_future(self, fut):
        if self._closed:
            raise BagContextError("Bag closed")
        self.futures.add(fut)
        fut.add_done_callback(self._future_done)
        self.done.clear()
        
    def close(self):
        for w in self.futures:
            w.cancel()
            
    async def aclose(self):
        self.close()
        await self.until_done()
        
    def _future_done(self, fut):
        try:
            self.futures.remove(fut)
        except KeyError:
            pass
        if not self.futures:
            self.done.set()
        try:
            fut.result()
        except asyncio.CancelledError:
            pass
        except Exception as ex:
            self.exceptions.append(ex)

是一个上下文管理器。在其上下文中,任务是由PBag.create_task而不是asyncio.create_task创建的。对象跟踪它的依赖任务,并在出现异常时关闭它们,如果上下文退出,或者如果调用了close()方法。

如果您使用它构建整个程序,则任务将按层次顺序排列,当最外层的任务被取消时,整个结构将被优雅地展开。如果您只在某些地方使用它,而没有在其他地方使用它(例如,如果您在某些地方编写asyncio.create_task ),则该展开可能不会很好地工作。

我没有太多的经验,所以,当然,可能会有未发现的bug。下面是一个演示程序:

代码语言:javascript
复制
async def main():
    async def task1():
        print("Task1 started", time.ctime())
        await asyncio.sleep(2)
        print("Task1 finished", time.ctime())
        
    async def task2():
        print("Task2 started", time.ctime())
        await asyncio.sleep(3)
        raise Exception("Task 2 error")
    
    async def task3(bag):
        bag.create_task(task2())
        print("Task 3 done")
        
    try:
        async with PBag() as bag:
            bag.create_task(task1())
            bag.create_task(task3(bag))
            await bag.until_done()
            bag.create_task(task1())
            await bag
    except asyncio.CancelledError:
        traceback.print_exc()
    except Exception:
        traceback.print_exc()
    print("Bag closed", time.ctime())
    asyncio.create_task(task1())
    print("Program finished", time.ctime())
        
if __name__ == "__main__":
    asyncio.run(main())
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70596654

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档