我正在开发Python服务器/客户端应用程序,其中服务器从客户端接收一些数据,并根据这些数据从嵌入式k/v存储中收集字典列表并将其流式传输回来。我在这里放了一个重现错误的代码。这就是为什么我把所有东西都放在服务器端的单独函数中(客户端发送不同的请求)的原因。
问题是服务器发送的速度比客户端可以使用的速度快,客户端一次读取几个响应,有时只是消息的一部分被截断。我认为writelines/readline对可以适当地从套接字中读取,但我想我遗漏了一些东西。写/排出也会重载套接字,一旦读取了多个结果,客户端就会失败,因为分块的序列化字典被读取到orjson.loads。
解决这个问题的正确方法是什么?提前谢谢你!
服务器:
import orjson
async def getResult(cnt : int):
await asyncio.sleep(0)
result = []
for i in range(cnt):
result.append({"key" : i})
return result
async def send(writer, list_of_dict):
for r in list_of_dict:
print(f"\nSending: {r}")
writer.writelines([orjson.dumps(r)])
await writer.drain()
# sending END signal
writer.writelines([orjson.dumps("END")])
await writer.drain()
async def handleClient(reader, writer):
addr = writer.get_extra_info('peername')
print(f"Connection from {addr}")
data = await reader.readline()
message = orjson.loads(data)
print(f"Received {message} from {addr}")
counter = message["send_me"]
responses = await getResult(counter)
await send(writer, responses)
print("Close the client socket")
writer.close()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handleClient, '127.0.0.1', 4000, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()客户端
import asyncio
import orjson
async def async_client(loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 4000, loop=loop)
counter = 5
print(f"Request counter: {counter}")
# in real life the message is a complex dictionary
msg = {"send_me" : counter}
writer.writelines([orjson.dumps(msg)])
#without write_eof the server reader.readline() waits for data and blocks
if writer.can_write_eof():
writer.write_eof()
while True:
data = await reader.readline()
if data:
print(data)
r = orjson.loads(data)
print(f"Received: {r}")
if r == "END":
print("server completed")
break
else:
await asyncio.sleep(0.1)
print('Close the socket')
writer.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(async_client(loop))
loop.close()错误:
>python echo_client.py
Request counter: 5
b'{"key":0}{"key":1}{"key":2}{"key":3}{"key":4}"END"'
Traceback (most recent call last):
File "echo_client.py", line 32, in <module>
loop.run_until_complete(async_client(loop))
File "C:\Program Files (x86)\Anaconda\lib\asyncio\base_events.py", line 587, in run_until_complete
return future.result()
File "echo_client.py", line 21, in async_client
r = orjson.loads(data)
orjson.JSONDecodeError: trailing characters at line 1 column 10: line 1 column 1 (char 0)发布于 2020-07-20 05:23:40
我认为问题要简单得多:writelines并不像你想象的那样工作。它不会插入换行符,它只是写入您给它的任何数据。这就是为什么您的客户端的readline()会将有效负载和"END"连接在一起。这也是为什么在另一个方向上需要write_eof的原因。
如果想要写一行,那么只需在有效负载后写一个换行符(字节)即可。您可以将其抽象到为您处理它的函数中:
async def write_msg(writer, msg):
writer.write(orjson.dumps(msg))
writer.write('\n')
await writer.drain()
async def read_msg(reader):
line = await reader.readline()
return orjson.loads(line)您可以在客户端和服务器上使用它们进行通信。
顺便说一句,您可能应该切换到较新的asyncio.run()应用程序接口,它创建并正确地销毁具有单个异步入口点的事件循环。您的服务器设置如下所示:
async def main():
await asyncio.start_server(handleClient, '127.0.0.1', 4000)
await server.wait_closed()
asyncio.run(main())https://stackoverflow.com/questions/62984212
复制相似问题