首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >取消python异步中的嵌套协同

取消python异步中的嵌套协同
EN

Stack Overflow用户
提问于 2016-12-05 16:50:13
回答 2查看 7.9K关注 0票数 2

在我的申请中,我有一个协同线,它可能等待其他几个协同线,而每个协同线(如果这个协同线)可能等待另一个协同线,等等。如果其中一个协同机制失败,则没有必要执行所有其他协同机制,这还没有执行。(在我的例子中,这甚至是有害的,我想启动几个回滚协同器。)那么,如何取消所有嵌套协同线的执行呢?以下是我现在所拥有的:

代码语言:javascript
复制
import asyncio

async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo2 done')

async def bar():
    await asyncio.gather(bar1(), bar2())


async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')


async def bar2():
    for i in range(5):
        print('Bar2', i)
        await asyncio.sleep(0.5)
    print('Bar2 done')


async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)

async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        await asyncio.gather(task_foo, task_bar)
    except Exception:
        print('One task failed. Canceling all')
        task_foo.cancel()
        task_bar.cancel()
    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

这显然是行不通的。如您所见,foo协同器已按我的要求被取消,但bar2仍在运行:

代码语言:javascript
复制
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4

所以,我肯定做错了什么。这里的正确方法是什么?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-12-06 12:06:22

当您调用task_bar.cancel()时,任务已经完成,因此没有效果。作为收集文档状态

如果return_exceptions为真,则任务中的异常将与成功的结果相同,并收集在结果列表中;否则,第一个引发的异常将立即传播到返回的未来。

这正是正在发生的事情,稍微修改一下您的task_bar协同:

代码语言:javascript
复制
async def bar():
    try:
        await asyncio.gather(bar1(), bar2())
    except Exception:
        print("Got a generic exception on bar")
        raise

产出:

代码语言:javascript
复制
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Got a generic exception on bar
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4

我还在task_bar调用之前打印task_bar.cancel(),注意它已经完成了,所以调用cancel没有任何影响。

就解决方案而言,我认为生成协同线需要处理它计划的协同线的取消,因为我无法找到一种方法来在协同线完成后检索它们(除了滥用Task.all_tasks,这听起来不对)。

尽管如此,我不得不使用wait而不是gather,然后返回第一个异常,下面是一个完整的示例:

代码语言:javascript
复制
import asyncio


async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo done')


async def bar():
    done, pending = await asyncio.wait(
        [bar1(), bar2()], return_when=asyncio.FIRST_EXCEPTION)

    for task in pending:
        task.cancel()

    for task in done:
        task.result()  # needed to raise the exception if it happened


async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')


async def bar2():
    for i in range(5):
        print('Bar2', i)
        await asyncio.sleep(0.5)
    print('Bar2 done')


async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)


async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        await asyncio.gather(task_foo, task_bar)
    except Exception:
        print('One task failed. Canceling all')
        print(task_bar)
        task_foo.cancel()
        task_bar.cancel()

    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

其中产出:

代码语言:javascript
复制
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines_2.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4

不太好,但很管用。

票数 3
EN

Stack Overflow用户

发布于 2016-12-05 22:48:44

据我所知,在取消协同线本身时,不可能自动取消协同线的所有子任务。所以你必须手动清理子任务。当等待asyncio.gather未来时抛出异常时,可以通过Gathering_future对象的_children属性访问剩余的任务。你的例子奏效了:

代码语言:javascript
复制
import asyncio

async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo2 done')

async def bar():
    gathering = asyncio.gather(bar1(), bar2())
    try:
        await gathering
    except Exception:
        # cancel all subtasks of this coroutine
        [task.cancel() for task in gathering._children]
        raise

async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')

async def bar2():
    for i in range(5):
        print('Bar2', i)
        try:
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            # you can cleanup here
            print("Bar2 cancelled")
            break
    else:
        print('Bar2 done')

async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)

async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        task = asyncio.gather(task_foo, task_bar)
        await task
    except Exception:
        print('One task failed. Canceling all')
        task_foo.cancel()
        task_bar.cancel()
    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

返回

代码语言:javascript
复制
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Bar2 cancelled
One task failed. Canceling all
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40979254

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档