所以我做了一个webservice (基于starlette),它的端点接受二进制体。我想把这个二进制的身体提供给fastavro。
Starlette doc says,我可以用request.stream()以异步流的形式访问原始数据。
async for chunk in request.stream():
# do something with chunk...现在,我要将流提供给fastavro。问题是,fastavro reader需要一个类似文件的输入流:
with open('some-file.avro', 'rb') as fo:
avro_reader = reader(fo)我的问题是,有没有一种干净的方法可以把这个异步数据流转换成一个类似文件的数据流?
我想我可以实现一个具有read()方法的对象,该方法等待并返回request.stream返回的数据。但是如果调用者传递一个size,我需要一个内存缓冲区,不是吗?会不会是基于BufferedRWPair的东西?
或者,在将整个流提供给fastavro之前,唯一的方法是先将整个流存储到磁盘或内存中吗?
提前感谢!
发布于 2019-12-20 17:53:59
我最终使用了SpooledTemporaryFile:
data_file = SpooledTemporaryFile(mode='w+b',
max_size=MAX_RECEIVED_DATA_MEMORY_SIZE)
async for chunk in request.stream():
data_file.write(chunk)
data_file.seek(0)
avro_reader = reader(data_file)这不是我设想的理想解决方案(以某种方式在输入和输出之间直接传输数据),但仍然足够好……
发布于 2021-10-01 19:30:55
我遇到了同样的问题,并编写了紧凑的类StreamingBody。它做的正是我所需要的。
from typing import AsyncIterator
import asyncio
class AsyncGen:
def __init__(self, block_count, block_size) -> None:
self.bc = block_count
self.bs = block_size
def __aiter__(self):
return self
async def __anext__(self):
if self.bc == 0:
raise StopAsyncIteration()
self.bc -= 1
return b"A" * self.bs
class StreamingBody:
_chunks: AsyncIterator[bytes]
_backlog: bytes
def __init__(self, chunks: AsyncIterator[bytes]):
self._chunks = chunks
self._backlog = b""
async def _read_until_end(self):
content = self._backlog
self._backlog = b""
while True:
try:
content += await self._chunks.__anext__()
except StopAsyncIteration:
break
return content
async def _read_chunk(self, size: int):
content = self._backlog
bytes_read = len(self._backlog)
while bytes_read < size:
try:
chunk = await self._chunks.__anext__()
except StopAsyncIteration:
break
content += chunk
bytes_read += len(chunk)
self._backlog = content[size:]
content = content[:size]
return content
async def read(self, size: int = -1):
if size > 0:
return await self._read_chunk(size)
elif size == -1:
return await self._read_until_end()
else:
return b""
async def main():
async_gen = AsyncGen(11, 3)
body = StreamingBody(async_gen)
res = await body.read(11)
print(f"[{len(res)}]: {res}")
res = await body.read()
print(f"[{len(res)}]: {res}")
res = await body.read()
print(f"[{len(res)}]: {res}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())https://stackoverflow.com/questions/59413796
复制相似问题