首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Asyncio未来ThreadExecutor

Asyncio未来ThreadExecutor
EN

Stack Overflow用户
提问于 2022-05-04 12:11:52
回答 1查看 94关注 0票数 0

我正在尝试将这个转换器(从S3到JSON)转换为多线程应用程序,这样我就可以加快多个文件(拥有985)的执行速度。由于一个给定的文件将是大约1gb,我想发送这些文件中的8个在同一时间被解析。

每当我运行它时,我都会得到:RuntimeWarning: coroutine 'process_object' was never awaited

下面是高级别的代码:

代码语言:javascript
复制
async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    return f"Processed {filename} in {time.time() - start} seconds"

if "__main__" == __name__:
    objects = get_objects(top_n=3) # list of prefixes for S3

    loop = asyncio.get_event_loop()

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = [
            asyncio.wrap_future(future)
            for future in [
                loop.run_in_executor(executor, process_object, url) for url in objects
            ]
        ]
        results = loop.run_until_complete(asyncio.gather(*futures))

    loop.close()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-05-04 14:02:10

我已经修改和简化了您的代码--我不知道为什么要将线程池的未来和异步结合起来,如果您想限制在asyncio中可以使用信号量的工作人员的数量

下面是不使用并发期货和简化代码的代码,因为我无法在本地完全复制上述错误

试试这个:

代码语言:javascript
复制
async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    print(f"Processed {filename} in {time.time() - start} seconds")


async def process_objects_bg(objects):
    resp = await asyncio.gather(*[process_object(url) for url in objects])
    return resp


if "__main__" == __name__:
    objects = get_objects(top_n=3)  # list of prefixes for S3
    asyncio.run(process_objects_bg(objects))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72112756

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档