我正致力于用所有使用asyncio和aiohttp的异步调用替换我的服务器查询工具的实现,该工具使用ThreadPoolExecutors。大多数转换都是直接的,因为网络调用是非阻塞的IO,保存响应让我陷入了一个难题。
我使用的所有示例,甚至是两个库的文档,都使用asyncio.gather()来收集所有可等待的结果。在我的例子中,这些结果可以是许多GB范围内的文件,我不想将它们存储在内存中。
解决这个问题的合适方法是什么?是否要使用asyncio.as_completed(),然后:
for f in as_completed(aws):
earliest_result = await f
# Assumes `loop` defined under `if __name__` block outside coroutine
loop = get_event_loop()
# Run the blocking IO in an exectuor and write to file
_ = await loop.run_in_executor(None, save_result, earliest_result)这不是引入了一个线程(假设我默认使用ThreadPoolExecutor ),从而使它成为异步的多线程程序,而不是异步的单线程程序吗?
此外,这是否确保在任何时候都只有1个earliest_result被写入文件?我不想运行对await loop.run_in_executor(...)的调用,然后出现另一个结果,我尝试运行同一个文件;我想我可以用一个信号量来限制。
发布于 2019-05-23 21:54:25
我建议使用aiohttp Streaming API。将响应直接写入磁盘,而不是RAM,并返回文件名,而不是从gather返回响应本身。这样做根本不会使用大量内存。这是我的意思的一个小演示:
import asyncio
import aiofiles
from aiohttp import ClientSession
async def make_request(session, url):
response = await session.request(method="GET", url=url)
filename = url.split('/')[-1]
async for data in response.content.iter_chunked(1024):
async with aiofiles.open(filename, "ba") as f:
await f.write(data)
return filename
async def main():
urls = ['https://github.com/Tinche/aiofiles',
'https://github.com/aio-libs/aiohttp']
async with ClientSession() as session:
coros = [make_request(session, url) for url in urls]
result_files = await asyncio.gather(*coros)
print(result_files)
asyncio.run(main())发布于 2019-03-08 22:54:00
在我的例子中,这些结果可以是许多GB范围内的文件,我不想将它们存储在内存中。
如果我是正确的,在你的代码中,单个aws意味着下载一个文件,你可能会面临以下问题:虽然as_completed允许将数据从内存快速交换到硬盘,但所有并行运行的aws同时在内存中存储各自的数据(缓存部分下载的文件)。
要避免这种情况,首先需要使用信号量来确保并行下载的文件不会太多,从而防止RAM的过度使用。
下面是使用semaphore的示例。
是否引入了线程(假设我默认使用ThreadPoolExecutor ),从而使它成为异步的多线程程序,而不是异步的单线程程序?
我不确定,我理解你的问题,但是的,你的代码将使用线程,但只有save_result将在这些线程中执行。所有其他代码仍然在单个主线程中运行。这里没什么不好的。
此外,这是否确保在任何时候都只有1个earliest_result被写入文件?
是的,它是*。准确地说,代码片段最后一行的关键字await将确保这一点:
_ = await loop.run_in_executor(None, save_result, earliest_result)您可以将其理解为:“开始异步执行run_in_executor,并在此行挂起执行流程,直到run_in_executor完成并返回结果”。
*是的,如果你没有在第一个地方并行运行多个for f in as_completed(aws)循环。
https://stackoverflow.com/questions/55054965
复制相似问题