首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用异步协程并行运行函数?

使用异步协程并行运行函数?
EN

Stack Overflow用户
提问于 2021-01-27 08:13:52
回答 1查看 196关注 0票数 3

我有下面的代码,从数据库(read_db)读取数据,并将数据写入拼图文件(data.to_parquet)。这两个I/O操作都需要一段时间才能运行。

代码语言:javascript
复制
def main():
    while id < 1000:
       logging.info(f'reading - id: {id}')
       data = read_db(id) # returns a dataframe

       logging.info(f'saving - id: {id}')
       data.to_parquet(f'{id}.parquet')
       logging.info(f'saved - id: {id}')

       id += 1

它很慢,所以我希望同时运行read_db(n+1)to_parquet(n)。我需要让id的每一步都按顺序完成(read_db(n+1)需要在read_db(n)之后运行,data.to_parquet(n+1)需要在data.to_parquet(n)之后运行)。下面是异步版本

代码语言:javascript
复制
def async_wrap(f):
    @wraps(f)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        p = partial(f, *args, **kwargs)
        return await loop.run_in_executor(executor, p)
    return run

async def main():
    read_db_async = async_wrap(read_db)
    while id < 1000:
       logging.info(f'reading - id: {id}')
       data = await read_db_async(id) # returns a dataframe

       logging.info(f'saving - id: {id}')
       to_parquet_async = async_wrap(data.to_parquet)
       await data.to_parquet(f'{id}.parquet')
       logging.info(f'saved - id: {id}')

       id += 1

asyncio.get_event_loop().run_until_complete(main())

我希望看到一些乱七八糟的日志:

代码语言:javascript
复制
reading - id: 1
saving - id: 1      (saving 1 and reading 2 run in parallel)
reading - id: 2
saved - id: 1
saving - id: 2
reading - id: 3
saved - id: 2
.....

但是,实际上同步代码的日志是相同的吗?

代码语言:javascript
复制
reading - id: 1
saving - id: 1
saved - id: 1
reading - id: 2
saving - id: 2
saved - id: 2
reading - id: 3
.....
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-27 17:31:10

您可以通过使用gather或等效工具来同时运行read_db(n+1)to_parquet(n)

代码语言:javascript
复制
async def main():
    read_db_async = async_wrap(read_db)
    prev_to_parquet = asyncio.sleep(0)  # no-op

    for id in range(1, 1000):
        data, _ = await asyncio.gather(read_db_async(id), prev_to_parquet)
        to_parquet_async = async_wrap(data.to_parquet)
        prev_to_parquet = to_parquet_async(f'{id}.parquet')

    await prev_to_parquet
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65911158

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档