首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python中的异步同时函数调用

Python中的异步同时函数调用
EN

Stack Overflow用户
提问于 2022-10-04 13:39:08
回答 1查看 102关注 0票数 0

我一直在寻找与JavaScript的await Promise.all()功能相当的Python,这使我使用了asyncio.gather()。在阅读了一些解释并遵循了一些示例之后,我还没有成功地让任何东西异步工作。

任务很简单:从S3中远程提取多个文件的值,然后在所有这些都完成后收集结果。我已经在JS中完成了这一点,从12个文件中读取只需要一点时间。

代码是为FastAPI编写的,其简化形式如下所示。我知道它不能异步工作的原因是s3中读取的文件越多,所用的时间就越长。

我已经看到了这类事情的文档,但是由于它不适合我,所以我不确定我是否做错了什么,或者这在我的用例中是行不通的。我担心远程文件中使用rasterio的流在这种情况下是行不通的。

我如何更改下面的代码,以便它并发调用函数,并在它们都完成后收集下面的所有响应?我以前还没有在python中使用过这个特性,所以只需要更多的说明。

代码语言:javascript
复制
async def read_from_file(s3_path):
    # The important thing to note here is that it
    #  is streaming from a file in s3 given an s3 path
    with rasterio.open(s3_path) as src:
        values = src.read(1, window=Window(1, 2, 1, 1))
        return values[0][0]

@app.get("/get-all")
async def get_all():
    start_time = datetime.datetime.now()
    # example paths
    s3_paths = [
        "s3:file-1",
        "s3:file-2",
        "s3:file-3",
        "s3:file-4",
        "s3:file-5",
        "s3:file-6",
    ]

    values = await asyncio.gather(
        read_from_file(s3_paths[0]),
        read_from_file(s3_paths[1]),
        read_from_file(s3_paths[2]),
        read_from_file(s3_paths[3]),
        read_from_file(s3_paths[4]),
        read_from_file(s3_paths[5]),
    )

    end_time = datetime.datetime.now()
    logger.info(f"duration: {end_time-start_time}")
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-04 22:12:09

Python异步有一种在其他线程中运行非异步代码的机制,比如对rasterio的调用,这样异步循环就不会被阻塞。

试试下面的代码:

代码语言:javascript
复制
import asyncio
from functools import partial

async def read_from_file(s3_path):
    # The important thing to note here is that it
    #  is streaming from a file in s3 given an s3 path
    loop = asyncio.get_running_loop()
    try: 
        src = await loop.run_in_executor(None, rasterio.open, s3_path)
        values = await loop.run_in_executor(None, partial(src.read, 1, window=Window(1, 2, 1, 1))
    finally:
        src.close()  # might be interesting to paralelize this as well
    return values[0][0]

如果需要更快,您可以创建一个自定义执行器:我认为,默认的执行器将只使用n_cpu线程,当瓶颈是网络延迟时,它可能会减慢速度--大约20个线程附近的某个点可能很有趣。(这个执行器应该是一个全局资源,或者作为参数传递给您的read_from_file,并且是一个普通的concurrent.futures.ThreadpoolPoolExecutor (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) )。

至于run_in_executor,请检查https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73948783

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档