我正在尝试将这个转换器(从S3到JSON)转换为多线程应用程序,这样我就可以加快多个文件(拥有985)的执行速度。由于一个给定的文件将是大约1gb,我想发送这些文件中的8个在同一时间被解析。
每当我运行它时,我都会得到:RuntimeWarning: coroutine 'process_object' was never awaited
下面是高级别的代码:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
return f"Processed {filename} in {time.time() - start} seconds"
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
asyncio.wrap_future(future)
for future in [
loop.run_in_executor(executor, process_object, url) for url in objects
]
]
results = loop.run_until_complete(asyncio.gather(*futures))
loop.close()发布于 2022-05-04 14:02:10
我已经修改和简化了您的代码--我不知道为什么要将线程池的未来和异步结合起来,如果您想限制在asyncio中可以使用信号量的工作人员的数量
下面是不使用并发期货和简化代码的代码,因为我无法在本地完全复制上述错误
试试这个:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
print(f"Processed {filename} in {time.time() - start} seconds")
async def process_objects_bg(objects):
resp = await asyncio.gather(*[process_object(url) for url in objects])
return resp
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
asyncio.run(process_objects_bg(objects))https://stackoverflow.com/questions/72112756
复制相似问题