在我的申请中,我有一个协同线,它可能等待其他几个协同线,而每个协同线(如果这个协同线)可能等待另一个协同线,等等。如果其中一个协同机制失败,则没有必要执行所有其他协同机制,这还没有执行。(在我的例子中,这甚至是有害的,我想启动几个回滚协同器。)那么,如何取消所有嵌套协同线的执行呢?以下是我现在所拥有的:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo2 done')
async def bar():
await asyncio.gather(bar1(), bar2())
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
await asyncio.sleep(0.5)
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
await asyncio.gather(task_foo, task_bar)
except Exception:
print('One task failed. Canceling all')
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()这显然是行不通的。如您所见,foo协同器已按我的要求被取消,但bar2仍在运行:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4所以,我肯定做错了什么。这里的正确方法是什么?
发布于 2016-12-06 12:06:22
当您调用task_bar.cancel()时,任务已经完成,因此没有效果。作为收集文档状态
如果return_exceptions为真,则任务中的异常将与成功的结果相同,并收集在结果列表中;否则,第一个引发的异常将立即传播到返回的未来。
这正是正在发生的事情,稍微修改一下您的task_bar协同:
async def bar():
try:
await asyncio.gather(bar1(), bar2())
except Exception:
print("Got a generic exception on bar")
raise产出:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Got a generic exception on bar
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4我还在task_bar调用之前打印task_bar.cancel(),注意它已经完成了,所以调用cancel没有任何影响。
就解决方案而言,我认为生成协同线需要处理它计划的协同线的取消,因为我无法找到一种方法来在协同线完成后检索它们(除了滥用Task.all_tasks,这听起来不对)。
尽管如此,我不得不使用wait而不是gather,然后返回第一个异常,下面是一个完整的示例:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo done')
async def bar():
done, pending = await asyncio.wait(
[bar1(), bar2()], return_when=asyncio.FIRST_EXCEPTION)
for task in pending:
task.cancel()
for task in done:
task.result() # needed to raise the exception if it happened
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
await asyncio.sleep(0.5)
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
await asyncio.gather(task_foo, task_bar)
except Exception:
print('One task failed. Canceling all')
print(task_bar)
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()其中产出:
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines_2.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4不太好,但很管用。
发布于 2016-12-05 22:48:44
据我所知,在取消协同线本身时,不可能自动取消协同线的所有子任务。所以你必须手动清理子任务。当等待asyncio.gather未来时抛出异常时,可以通过Gathering_future对象的_children属性访问剩余的任务。你的例子奏效了:
import asyncio
async def foo():
for i in range(5):
print('Foo', i)
await asyncio.sleep(0.5)
print('Foo2 done')
async def bar():
gathering = asyncio.gather(bar1(), bar2())
try:
await gathering
except Exception:
# cancel all subtasks of this coroutine
[task.cancel() for task in gathering._children]
raise
async def bar1():
await asyncio.sleep(1)
raise Exception('Boom!')
async def bar2():
for i in range(5):
print('Bar2', i)
try:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# you can cleanup here
print("Bar2 cancelled")
break
else:
print('Bar2 done')
async def baz():
for i in range(5):
print('Baz', i)
await asyncio.sleep(0.5)
async def main():
task_foo = asyncio.Task(foo())
task_bar = asyncio.Task(bar())
try:
task = asyncio.gather(task_foo, task_bar)
await task
except Exception:
print('One task failed. Canceling all')
task_foo.cancel()
task_bar.cancel()
print('Now we want baz')
await baz()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()返回
Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Bar2 cancelled
One task failed. Canceling all
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4https://stackoverflow.com/questions/40979254
复制相似问题