我正在维护一个项目,在这个项目中,我需要在已经在事件循环中运行的同步函数中安排一个协同线。
我的问题归结为:
import asyncio
class SomeScheduler:
def __init__(self):
self.workers = []
# This is not a coroutine yet
def close(self):
for worker in self.workers:
worker.close()
def register_worker(self, worker):
self.workers.append(worker)
async def run(self):
for _ in range(3):
print("Doing stuff")
coros = map(lambda x: x.work(), self.workers)
await asyncio.gather(*coros)
await asyncio.sleep(1)
class SomeWorkerA:
def close(self):
print("Closing WorkerA")
async def work(self):
print("Working WorkerA")
await asyncio.sleep(0.2)
print("Done WorkerA")
class SomeWorkerB:
def close(self):
print("Closing WorkerB")
async def work(self):
print("Working WorkerB")
await asyncio.sleep(0.4)
print("Done WorkerB")
async def main():
sched = SomeScheduler()
sched.register_worker(SomeWorkerA())
sched.register_worker(SomeWorkerB())
try:
await sched.run()
finally:
sched.close()
print("Bye")
asyncio.run(main())由于历史原因,SomeScheduler.close()不是一个协同器,我不能更改API (在团队中不进行大量讨论)。
现在我有了一种新型的工人:
class SomeWorkerC:
async def close(self):
print("Closing WorkerC")
await asyncio.sleep(10)
print("Done closing WorkerC")
async def work(self):
print("Working WorkerC")
await asyncio.sleep(0.4)
print("Done WorkerC")如果我在sched.register_worker(SomeWorkerC())函数中添加了main(),那么问题是SomeWorkerC.close()没有被执行,我得到了以下错误消息:
RuntimeWarning: coroutine 'SomeWorkerC.close' was never awaited
co = None这是有意义的,所以我想像这样改变SomeScheduler.close():
class SomeScheduler:
def __init__(self):
self.workers = []
def close(self):
for worker in self.workers:
co = worker.close()
if isinstance(co, types.CoroutineType):
loop = asyncio.get_running_loop()
loop.create_task(co)只有部分正确的stdout输出是:
...
Done WorkerA
Done WorkerB
Done WorkerC
Closing WorkerA
Closing WorkerB
Bye
Closing WorkerC但是,我也希望执行print("Done closing WorkerC"),但是从stdout输出中可以看出,情况并非如此。loop.create_task()的文档显示:
计划执行一个协同任务。返回一个Task对象。
因此,我假设整个任务将被执行,但似乎只有在第一个await被执行之前,其余的任务才会被执行。我不能执行loop.run_forever()或loop.run_until_complete(co),因为循环仍然在运行。
我甚至尝试用另一个循环替换这个循环:
class SomeScheduler:
def __init__(self):
self.workers = []
def close(self):
for worker in self.workers:
co = worker.close()
if isinstance(co, types.CoroutineType):
current_loop = asyncio.get_running_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.run(co)但是这在RuntimeError: asyncio.run() cannot be called from a running event loop中失败了,如果我执行asyncio.gather(co),那么第一部分将被执行,但是这将引发:
...
Closing WorkerA
Closing WorkerB
Bye
Closing WorkerC
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError此时,我甚至不确定在不改变API和使SomeScheduler.close()成为一个共同例程的情况下,我是否能够解决这个问题。我怎么才能解决这个问题?
发布于 2020-04-24 06:57:41
您不能这样做,因为工人的.close协同器中的异步代码永远不会在正在运行的事件循环中运行,因为它目前正在被SomeScheduler的同步.close阻塞;也就是说,在调度程序的.close停止阻塞事件循环之前,异步工作器不会被关闭,这显然是不可能的,因为工人的关闭位于调度程序关闭的下游。
解决这一问题的方法是调用调度程序的.close的上游异步关闭;
def shutdown():
await scheduler.async_close() # close async workers
scheduler.close() # close sync workershttps://stackoverflow.com/questions/61277342
复制相似问题