首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在非协同线方法中调度协同线

在非协同线方法中调度协同线
EN

Stack Overflow用户
提问于 2020-04-17 17:31:10
回答 1查看 297关注 0票数 0

我正在维护一个项目,在这个项目中,我需要在已经在事件循环中运行的同步函数中安排一个协同线。

我的问题归结为:

代码语言:javascript
复制
import asyncio

class SomeScheduler:
    def __init__(self):
        self.workers = []

    # This is not a coroutine yet
    def close(self):
        for worker in self.workers:
            worker.close()

    def register_worker(self, worker):
        self.workers.append(worker)

    async def run(self):
        for _ in range(3):
            print("Doing stuff")
            coros = map(lambda x: x.work(), self.workers)
            await asyncio.gather(*coros)
            await asyncio.sleep(1)


class SomeWorkerA:
    def close(self):
        print("Closing WorkerA")

    async def work(self):
        print("Working WorkerA")
        await asyncio.sleep(0.2)
        print("Done WorkerA")

class SomeWorkerB:
    def close(self):
        print("Closing WorkerB")

    async def work(self):
        print("Working WorkerB")
        await asyncio.sleep(0.4)
        print("Done WorkerB")

async def main():
    sched = SomeScheduler()
    sched.register_worker(SomeWorkerA())
    sched.register_worker(SomeWorkerB())

    try:
        await sched.run()
    finally:
        sched.close()
        print("Bye")


asyncio.run(main())

由于历史原因,SomeScheduler.close()不是一个协同器,我不能更改API (在团队中不进行大量讨论)。

现在我有了一种新型的工人:

代码语言:javascript
复制
class SomeWorkerC:
    async def close(self):
        print("Closing WorkerC")
        await asyncio.sleep(10)
        print("Done closing WorkerC")

    async def work(self):
        print("Working WorkerC")
        await asyncio.sleep(0.4)
        print("Done WorkerC")

如果我在sched.register_worker(SomeWorkerC())函数中添加了main(),那么问题是SomeWorkerC.close()没有被执行,我得到了以下错误消息:

代码语言:javascript
复制
RuntimeWarning: coroutine 'SomeWorkerC.close' was never awaited
  co = None

这是有意义的,所以我想像这样改变SomeScheduler.close()

代码语言:javascript
复制
class SomeScheduler:
    def __init__(self):
        self.workers = []

    def close(self):
        for worker in self.workers:
            co = worker.close()
            if isinstance(co, types.CoroutineType):
                loop = asyncio.get_running_loop()
                loop.create_task(co)

只有部分正确的stdout输出是:

代码语言:javascript
复制
...
Done WorkerA
Done WorkerB
Done WorkerC
Closing WorkerA
Closing WorkerB
Bye
Closing WorkerC

但是,我也希望执行print("Done closing WorkerC"),但是从stdout输出中可以看出,情况并非如此。loop.create_task()的文档显示:

计划执行一个协同任务。返回一个Task对象。

因此,我假设整个任务将被执行,但似乎只有在第一个await被执行之前,其余的任务才会被执行。我不能执行loop.run_forever()loop.run_until_complete(co),因为循环仍然在运行。

我甚至尝试用另一个循环替换这个循环:

代码语言:javascript
复制
class SomeScheduler:
    def __init__(self):
        self.workers = []

    def close(self):
        for worker in self.workers:
            co = worker.close()

            if isinstance(co, types.CoroutineType):
                current_loop = asyncio.get_running_loop()
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                asyncio.run(co)

但是这在RuntimeError: asyncio.run() cannot be called from a running event loop中失败了,如果我执行asyncio.gather(co),那么第一部分将被执行,但是这将引发:

代码语言:javascript
复制
...
Closing WorkerA
Closing WorkerB
Bye
Closing WorkerC
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

此时,我甚至不确定在不改变API和使SomeScheduler.close()成为一个共同例程的情况下,我是否能够解决这个问题。我怎么才能解决这个问题?

EN

回答 1

Stack Overflow用户

发布于 2020-04-24 06:57:41

您不能这样做,因为工人的.close协同器中的异步代码永远不会在正在运行的事件循环中运行,因为它目前正在被SomeScheduler的同步.close阻塞;也就是说,在调度程序的.close停止阻塞事件循环之前,异步工作器不会被关闭,这显然是不可能的,因为工人的关闭位于调度程序关闭的下游。

解决这一问题的方法是调用调度程序的.close的上游异步关闭;

代码语言:javascript
复制
def shutdown():
    await scheduler.async_close()  # close async workers
    scheduler.close()  # close sync workers
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61277342

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档