首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >不使用异步编写EventLoop

不使用异步编写EventLoop
EN

Stack Overflow用户
提问于 2018-12-13 17:39:47
回答 2查看 814关注 0票数 4

我非常熟悉python的异步、python中的异步编程、协同例程等。我希望能够使用自己定制的事件环执行几个协同例程。

我很好奇,我是否可以编写自己的事件循环,而无需导入异步

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-12-13 18:07:27

我希望能够使用我自己定制的事件循环来执行几个共同例程。

异步事件循环经过了良好的测试,可以很容易地扩展到确认非异步事件。如果您描述了实际的用例,它可能更容易帮助。但是,如果您的目标是学习异步编程和协作,请继续阅读。

我很好奇我是否可以编写自己的事件循环,而不需要导入异步

这是绝对可能的--毕竟异步本身只是一个库--但是要使您的事件循环变得有用还需要做一些工作。参见David的这场精彩的演讲,在这里他演示了在现场观众面前编写事件循环。(不要因为大卫使用旧的yield from语法而推迟-- await的工作方式完全相同。)

票数 3
EN

Stack Overflow用户

发布于 2018-12-29 15:03:15

好的,所以我在某个地方找到了一个例子(对不起,不记得在哪里,没有链接),然后做了一些改动。

一个eventloopco-routins,甚至没有导入asyncio

代码语言:javascript
复制
import datetime
import heapq
import types
import time

class Task:
    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until

class SleepingLoop:
    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        # Start all the coroutines.
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))

        # Keep running until there is no more work to do.
        while self._waiting:
            now = datetime.datetime.now()
            # Get the coroutine with the soonest resumption time.
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                # We're ahead of schedule; wait until it's time to resume.
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                # It's time to resume the coroutine.
                wait_until = task.coro.send(now)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                # The coroutine is done.
                pass


@types.coroutine
def async_sleep(seconds):
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    actual = yield wait_until

    return actual - now


async def countdown(label, total_seconds_wait, *, delay=0):
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = await async_sleep(delay)
    print(label, 'starting after waiting', delta)
    while total_seconds_wait:
        print(label, 'T-minus', total_seconds_wait)
        waited = await async_sleep(1)
        total_seconds_wait -= 1
    print(label, 'lift-off!')


def main():
    loop = SleepingLoop(countdown('A', 5, delay=0),
                        countdown('B', 3, delay=2),
                        countdown('C', 4, delay=1))
    start = datetime.datetime.now()
    loop.run_until_complete()

    print('Total elapsed time is', datetime.datetime.now() - start)



if __name__ == '__main__':
    main()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53767355

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档