首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python中使用asyncio的字典流式处理列表

Python中使用asyncio的字典流式处理列表
EN

Stack Overflow用户
提问于 2020-07-20 02:34:17
回答 1查看 413关注 0票数 0

我正在开发Python服务器/客户端应用程序,其中服务器从客户端接收一些数据,并根据这些数据从嵌入式k/v存储中收集字典列表并将其流式传输回来。我在这里放了一个重现错误的代码。这就是为什么我把所有东西都放在服务器端的单独函数中(客户端发送不同的请求)的原因。

问题是服务器发送的速度比客户端可以使用的速度快,客户端一次读取几个响应,有时只是消息的一部分被截断。我认为writelines/readline对可以适当地从套接字中读取,但我想我遗漏了一些东西。写/排出也会重载套接字,一旦读取了多个结果,客户端就会失败,因为分块的序列化字典被读取到orjson.loads。

解决这个问题的正确方法是什么?提前谢谢你!

服务器:

代码语言:javascript
复制
import orjson

async def getResult(cnt : int):
    await asyncio.sleep(0)
    result = []
    for i in range(cnt):
        result.append({"key" : i})
    return result

async def send(writer, list_of_dict):
    for r in list_of_dict:
        print(f"\nSending: {r}")
        writer.writelines([orjson.dumps(r)])
        await writer.drain()
    # sending END signal
    writer.writelines([orjson.dumps("END")])
    await writer.drain()

async def handleClient(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"Connection from {addr}")
    data = await reader.readline()
    message = orjson.loads(data)
    print(f"Received {message} from {addr}")
    counter = message["send_me"]
    responses = await getResult(counter)
    await send(writer, responses)
    print("Close the client socket")
    writer.close()


loop = asyncio.get_event_loop()
coro = asyncio.start_server(handleClient, '127.0.0.1', 4000, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
 
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

客户端

代码语言:javascript
复制
import asyncio
import orjson
 
async def async_client(loop):
    reader, writer = await asyncio.open_connection('127.0.0.1', 4000, loop=loop)
    counter = 5
    print(f"Request counter: {counter}")
    # in real life the message is a complex dictionary
    msg = {"send_me" : counter}
    writer.writelines([orjson.dumps(msg)])
    #without write_eof the server reader.readline() waits for data and blocks
    if writer.can_write_eof():
        writer.write_eof()

    while True:
        data = await reader.readline()
        if data:
            print(data)
            r = orjson.loads(data)
            print(f"Received: {r}")
            if r == "END":
                print("server completed")
                break
        else:
            await asyncio.sleep(0.1)

    print('Close the socket')
    writer.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(async_client(loop))
loop.close()

错误:

代码语言:javascript
复制
>python echo_client.py
Request counter: 5

b'{"key":0}{"key":1}{"key":2}{"key":3}{"key":4}"END"'

Traceback (most recent call last):
  File "echo_client.py", line 32, in <module>
    loop.run_until_complete(async_client(loop))
  File "C:\Program Files (x86)\Anaconda\lib\asyncio\base_events.py", line 587, in run_until_complete
    return future.result()
  File "echo_client.py", line 21, in async_client
    r = orjson.loads(data)

orjson.JSONDecodeError: trailing characters at line 1 column 10: line 1 column 1 (char 0)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-07-20 05:23:40

我认为问题要简单得多:writelines并不像你想象的那样工作。它不会插入换行符,它只是写入您给它的任何数据。这就是为什么您的客户端的readline()会将有效负载和"END"连接在一起。这也是为什么在另一个方向上需要write_eof的原因。

如果想要写一行,那么只需在有效负载后写一个换行符(字节)即可。您可以将其抽象到为您处理它的函数中:

代码语言:javascript
复制
async def write_msg(writer, msg):
    writer.write(orjson.dumps(msg))
    writer.write('\n')
    await writer.drain()

async def read_msg(reader):
    line = await reader.readline()
    return orjson.loads(line)

您可以在客户端和服务器上使用它们进行通信。

顺便说一句,您可能应该切换到较新的asyncio.run()应用程序接口,它创建并正确地销毁具有单个异步入口点的事件循环。您的服务器设置如下所示:

代码语言:javascript
复制
async def main():
    await asyncio.start_server(handleClient, '127.0.0.1', 4000)
    await server.wait_closed()

asyncio.run(main())
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62984212

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档