首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Python3中将异步生成流转换为类文件对象?

如何在Python3中将异步生成流转换为类文件对象?
EN

Stack Overflow用户
提问于 2019-12-20 00:42:14
回答 2查看 565关注 0票数 2

所以我做了一个webservice (基于starlette),它的端点接受二进制体。我想把这个二进制的身体提供给fastavro。

Starlette doc says,我可以用request.stream()以异步流的形式访问原始数据。

代码语言:javascript
复制
async for chunk in request.stream():
    # do something with chunk...

现在,我要将流提供给fastavro。问题是,fastavro reader需要一个类似文件的输入流:

代码语言:javascript
复制
with open('some-file.avro', 'rb') as fo:
    avro_reader = reader(fo)

我的问题是,有没有一种干净的方法可以把这个异步数据流转换成一个类似文件的数据流?

我想我可以实现一个具有read()方法的对象,该方法等待并返回request.stream返回的数据。但是如果调用者传递一个size,我需要一个内存缓冲区,不是吗?会不会是基于BufferedRWPair的东西?

或者,在将整个流提供给fastavro之前,唯一的方法是先将整个流存储到磁盘或内存中吗?

提前感谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-12-20 17:53:59

我最终使用了SpooledTemporaryFile:

代码语言:javascript
复制
data_file = SpooledTemporaryFile(mode='w+b',
        max_size=MAX_RECEIVED_DATA_MEMORY_SIZE)
async for chunk in request.stream():
    data_file.write(chunk)
data_file.seek(0)
avro_reader = reader(data_file)

这不是我设想的理想解决方案(以某种方式在输入和输出之间直接传输数据),但仍然足够好……

票数 2
EN

Stack Overflow用户

发布于 2021-10-01 19:30:55

我遇到了同样的问题,并编写了紧凑的类StreamingBody。它做的正是我所需要的。

代码语言:javascript
复制
from typing import AsyncIterator
import asyncio


class AsyncGen:
    def __init__(self, block_count, block_size) -> None:
        self.bc = block_count
        self.bs = block_size

    def __aiter__(self):
        return self

    async def __anext__(self):

        if self.bc == 0:
            raise StopAsyncIteration()

        self.bc -= 1
        return b"A" * self.bs


class StreamingBody:

    _chunks: AsyncIterator[bytes]
    _backlog: bytes

    def __init__(self, chunks: AsyncIterator[bytes]):
        self._chunks = chunks
        self._backlog = b""

    async def _read_until_end(self):

        content = self._backlog
        self._backlog = b""

        while True:
            try:
                content += await self._chunks.__anext__()
            except StopAsyncIteration:
                break

        return content

    async def _read_chunk(self, size: int):

        content = self._backlog
        bytes_read = len(self._backlog)

        while bytes_read < size:

            try:
                chunk = await self._chunks.__anext__()
            except StopAsyncIteration:
                break

            content += chunk
            bytes_read += len(chunk)

        self._backlog = content[size:]
        content = content[:size]

        return content

    async def read(self, size: int = -1):
        if size > 0:
            return await self._read_chunk(size)
        elif size == -1:
            return await self._read_until_end()
        else:
            return b""

async def main():
    async_gen = AsyncGen(11, 3)
    body = StreamingBody(async_gen)

    res = await body.read(11)
    print(f"[{len(res)}]: {res}")

    res = await body.read()
    print(f"[{len(res)}]: {res}")

    res = await body.read()
    print(f"[{len(res)}]: {res}")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59413796

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档