首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何与异步并发运行无限循环?

如何与异步并发运行无限循环?
EN

Stack Overflow用户
提问于 2017-12-11 03:20:29
回答 2查看 9.1K关注 0票数 7

如何在等待时继续下一个循环?例如:

代码语言:javascript
复制
async def get_message():
    # async get message from queue
    return message

async process_message(message):
    # make some changes on message
    return message

async def deal_with_message(message):
    # async update some network resource with given message

async def main():
    while True:
        message = await get_message()
        message = await process_message(message)
        await deal_with_message(message)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

如何使while True循环并发?如果正在等待deal_with_message,则可以转到下一个循环并运行get_message

编辑过的

我想我找到了一个解决办法:

代码语言:javascript
复制
async def main():
    asyncio.ensure_future(main())
    message = await get_message()
    message = await process_message(message)
    await deal_with_message(message)

loop = asyncio.get_event_loop()
asyncio.ensure_future(main())
loop.run_forever()
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-12-11 07:34:56

你的解决方案会奏效的,不过我觉得有问题。

代码语言:javascript
复制
async def main():
    asyncio.ensure_future(main())
    # task finishing

一旦main启动,它就会创建新任务并立即发生(ensure_future会立即创建任务),而这个任务的实际完成需要时间。我想这可能会导致产生大量的任务,从而耗尽您的RAM。

此外,这意味着潜在的任何大量任务都可以同时运行。它可以耗尽您的网络吞吐量或可以同时打开的套接字数量(想象一下您正在绑定下载1000个urls -不会发生任何好事)。

在并发世界中,这个问题通常是通过限制可以与一些合理的值并发运行的事物的数量来实现的(如能解决 )。但是,在您的情况下,我认为手动跟踪运行的任务数量并手动填充它更方便:

代码语言:javascript
复制
import asyncio
from random import randint


async def get_message():
    message = randint(0, 1_000)
    print(f'{message} got')
    return message


async def process_message(message):
    await asyncio.sleep(randint(1, 5))
    print(f'{message} processed')
    return message


async def deal_with_message(message):
    await asyncio.sleep(randint(1, 5))
    print(f'{message} dealt')


async def utilize_message():
    message = await get_message()
    message = await process_message(message)
    await deal_with_message(message)


parallel_max = 5  # don't utilize more than 5 msgs parallely
parallel_now = 0


def populate_tasks():
    global parallel_now
    for _ in range(parallel_max - parallel_now):
        parallel_now += 1
        task = asyncio.ensure_future(utilize_message())
        task.add_done_callback(on_utilized)


def on_utilized(_):
    global parallel_now
    parallel_now -= 1
    populate_tasks()


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        populate_tasks()
        loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

产出如下:

代码语言:javascript
复制
939 got
816 got
737 got
257 got
528 got
939 processed
816 processed
528 processed
816 dealt
589 got
939 dealt
528 dealt
712 got
263 got
737 processed
257 processed
263 processed
712 processed
263 dealt
712 dealt
386 got
708 got
589 processed
257 dealt
386 processed
708 processed
711 got
711 processed

这里的重要部分是我们如何得到下一条消息,只有在运行的任务数量减少到少于5个之后才能被使用。

Upd:

是的,如果不需要动态更改最大运行号,信号量似乎更方便。

代码语言:javascript
复制
sem = asyncio.Semaphore(5)


async def main():
    async with sem:
        asyncio.ensure_future(main())
        await utilize_message()


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        asyncio.ensure_future(main())
        loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
票数 7
EN

Stack Overflow用户

发布于 2017-12-11 04:05:43

最简单的解决方案是asyncio.ensure_future

代码语言:javascript
复制
async def main():
    tasks = []
    while running:
        message = await get_message()
        message = await process_message(message)
        coroutine = deal_with_message(message)
        task = asyncio.ensure_future(coroutine) # starts running coroutine
        tasks.append(task)
    await asyncio.wait(tasks)

如果你所有的任务都能在最后等待的话,你自己跟踪任务是可选的。

代码语言:javascript
复制
async def main():
    while running:
        message = await get_message()
        message = await process_message(message)
        coroutine = deal_with_message(message)
        asyncio.ensure_future(coroutine)
    tasks = asyncio.Task.all_tasks()
    await asyncio.wait(tasks)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47745989

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档