我想做以下事情:我想做一个websocket,它打印所有关于esport的事件,我想使用https://sofascore.com --我像往常一样检查了网络请求,似乎我需要先发送一个Auth WebSocket内容,然后发送一个订阅正确的项目,然后我会收到我需要的事件。
我编写了以下代码:
import websockets
import asyncio
from websockets.extensions import permessage_deflate
async def esports():
async with websockets.connect('wss://ws.sofascore.com:9222/', compression='deflate') as websocket:
msg = await websocket.recv()
print(f"From Server: {msg}")
t = await websocket.send(
'CONNECT {"no_responders":true,"protocol":1,"verbose":false,"pedantic":false,"user":"none","pass":"none","lang":"nats.ws","version":"1.8.1","headers":true}')
await websocket.send("PING")
pong = await websocket.recv()
print(f"From Server: {pong}")
await websocket.send(
'SUB sport.esports 6')
while (True):
msg = await websocket.recv()
print(f"From Server: {msg}")
asyncio.get_event_loop().run_until_complete(esports())当我看到websocket的请求头时,我知道websocket被压缩为permessage_deflate。
但我还是有个错误:
Traceback (most recent call last):
File "C:\Users\Coding\Desktop\websockett.py", line 23, in <module>
asyncio.get_event_loop().run_until_complete(esports())
File "C:\Users\Coding\AppData\Local\Programs\Python\Python39-32\lib\asyncio\base_events.py", line 642, in run_until_complete
return future.result()
File "C:\Users\Coding\Desktop\websockett.py", line 15, in esports
await websocket.send(
File "C:\Users\Coding\AppData\Roaming\Python\Python39\site-packages\websockets\legacy\protocol.py", line 620, in send
await self.ensure_open()
File "C:\Users\Coding\AppData\Roaming\Python\Python39\site-packages\websockets\legacy\protocol.py", line 921, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: received 1008 (policy violation) Authentication Timeout; then sent 1008 (policy violation) Authentication Timeout
Process finished with exit code 1编辑:
我现在发现这一切都适用于Nats网络。是否有任何方法将Nats与也支持websockets的Libary一起使用?
不幸的是没有在github或pypi上找到一个..。
发布于 2022-09-20 21:29:20
理想情况下,您可以使用nats-py库
import asyncio
import nats
async def handler(msg):
print(f"From server: {msg}")
async def main():
nc = await nats.connect("wss://ws.sofascore.com:9222")
await nc.subscribe("sport.esports", cb=handler)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
try:
loop.run_forever()
finally:
loop.close()但是,这个库目前不支持与WebSockets连接,因此上面的库无法工作(目前看来它是正在研究中 )。
对于您的代码来说,它失败的唯一原因是您要发送的消息没有以\r\n结尾,这是NATS协议所需的。代码按照预期的方式处理此更改:
import asyncio
import websockets
async def esports():
async with websockets.connect('wss://ws.sofascore.com:9222') as websocket:
msg = await websocket.recv()
print(f"From Server: {msg}")
await websocket.send(
'CONNECT {"no_responders":true,"protocol":1,"verbose":false,"pedantic":false,"user":"none","pass":"none","lang":"nats.ws","version":"1.8.1","headers":true}\r\n'
)
await websocket.send("SUB sport.esports 1\r\n")
async for msg in websocket:
print(f"From Server: {msg}")
asyncio.run(esports())当然,这最终会被断开,因为它不响应PING消息。下面是一个更充实的脚本,它实现了足够多的NATS协议来记录sport.esports消息:
import asyncio
import json
import textwrap
from dataclasses import dataclass
import websockets
class SofaError(Exception):
pass
def message_string(message, data=None, pretty=False):
s = message
if data is not None:
if pretty:
s += json.dumps(data, indent=2)
else:
s += json.dumps(data, separators=(",", ":"))
return s
def log(pre, message, data=None):
print(textwrap.indent(message_string(message, data, True), pre))
def recv_log(message, data=None):
log("< ", message, data)
async def send(websocket, message, data=None):
log("> ", message, data)
data = (message_string(message, data, False) + "\r\n").encode()
await websocket.send(data)
async def connect_and_subscribe(websocket):
connect_options = {
"no_responders": True,
"protocol": 1,
"verbose": False,
"pedantic": False,
"user": "none",
"pass": "none",
"lang": "nats.ws",
"version": "1.8.1",
"headers": True,
}
await send(websocket, "CONNECT ", connect_options)
await send(websocket, "SUB sport.esports 1")
@dataclass
class NatsMsg:
subject: str
sid: str
reply_to: str
size: int
payload: bytes
def parse_msg(info_line, pending_data):
if not info_line:
raise SofaError("No payload information received")
info = [b.decode(errors="replace") for b in info_line.split(b" ")]
if len(info) == 3:
subject, sid, size = info
reply_to = None
elif len(info) == 4:
subject, sid, reply_to, size = info
else:
raise SofaError("Unrecognized info format")
try:
size = int(size)
except ValueError:
raise SofaError("Bad payload size")
if len(pending_data) < size:
raise SofaError("Incomplete payload")
payload = pending_data[:size]
pending_data = pending_data[size:]
return NatsMsg(subject, sid, reply_to, size, payload), pending_data
async def handler(websocket, ws_message, connected):
while len(ws_message):
nats_message, _, ws_message = ws_message.partition(b"\r\n")
if not nats_message:
continue
op, _, rest = nats_message.partition(b" ")
if op == b"-ERR":
recv_log(nats_message.decode(errors="replace"))
err = rest.strip(b"'").decode(errors="replace") if rest else "(No message received)"
raise SofaError(f"Server error: {err}")
elif op == b"INFO":
info_options = json.loads(rest) if rest else None
recv_log("INFO ", info_options)
if not connected:
await connect_and_subscribe(websocket)
connected = True
elif op == b"PING":
recv_log("PING")
await send(websocket, "PONG")
elif op == b"MSG":
try:
msg, ws_message = parse_msg(rest, ws_message)
except SofaError as e:
recv_log(f"MSG (Error: {e}) {rest}")
continue
msg_info = (
f"MSG subject={msg.subject} sid={msg.sid} "
f"reply-to={msg.reply_to} nbytes={msg.size}:\n"
)
try:
decoded = msg.payload.decode()
data = json.loads(decoded)
except UnicodeError:
recv_log(f"{msg_info}{msg.payload}")
except json.JSONDecodeError:
recv_log(f"{msg_info}{decoded}")
else:
recv_log(msg_info, data)
else:
recv_log(f"(Unhandled op) {nats_message.decode(errors='replace')}")
return connected
async def main():
async with websockets.connect("wss://ws.sofascore.com:9222") as websocket:
connected = False
async for message in websocket:
connected = await handler(websocket, message, connected)
if __name__ == "__main__":
asyncio.run(main())https://stackoverflow.com/questions/73720939
复制相似问题