首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >NATS WEBSOCKET Python WebSocket连接

NATS WEBSOCKET Python WebSocket连接
EN

Stack Overflow用户
提问于 2022-09-14 17:24:36
回答 1查看 122关注 0票数 2

我想做以下事情:我想做一个websocket,它打印所有关于esport的事件,我想使用https://sofascore.com --我像往常一样检查了网络请求,似乎我需要先发送一个Auth WebSocket内容,然后发送一个订阅正确的项目,然后我会收到我需要的事件。

我编写了以下代码:

代码语言:javascript
复制
import websockets
import asyncio
from websockets.extensions import permessage_deflate


async def esports():
    async with websockets.connect('wss://ws.sofascore.com:9222/', compression='deflate') as websocket:
        msg = await websocket.recv()
        print(f"From Server: {msg}")
        t = await websocket.send(
            'CONNECT {"no_responders":true,"protocol":1,"verbose":false,"pedantic":false,"user":"none","pass":"none","lang":"nats.ws","version":"1.8.1","headers":true}')
        await websocket.send("PING")
        pong = await websocket.recv()
        print(f"From Server: {pong}")
        await websocket.send(
            'SUB sport.esports 6')
        while (True):
            msg = await websocket.recv()
            print(f"From Server: {msg}")



asyncio.get_event_loop().run_until_complete(esports())

当我看到websocket的请求头时,我知道websocket被压缩为permessage_deflate。

但我还是有个错误:

代码语言:javascript
复制
Traceback (most recent call last):
  File "C:\Users\Coding\Desktop\websockett.py", line 23, in <module>
    asyncio.get_event_loop().run_until_complete(esports())
  File "C:\Users\Coding\AppData\Local\Programs\Python\Python39-32\lib\asyncio\base_events.py", line 642, in run_until_complete
    return future.result()
  File "C:\Users\Coding\Desktop\websockett.py", line 15, in esports
    await websocket.send(
  File "C:\Users\Coding\AppData\Roaming\Python\Python39\site-packages\websockets\legacy\protocol.py", line 620, in send
    await self.ensure_open()
  File "C:\Users\Coding\AppData\Roaming\Python\Python39\site-packages\websockets\legacy\protocol.py", line 921, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: received 1008 (policy violation) Authentication Timeout; then sent 1008 (policy violation) Authentication Timeout

Process finished with exit code 1

编辑:

我现在发现这一切都适用于Nats网络。是否有任何方法将Nats与也支持websockets的Libary一起使用?

不幸的是没有在github或pypi上找到一个..。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-09-20 21:29:20

理想情况下,您可以使用nats-py库

代码语言:javascript
复制
import asyncio
import nats


async def handler(msg):
    print(f"From server: {msg}")


async def main():
    nc = await nats.connect("wss://ws.sofascore.com:9222")
    await nc.subscribe("sport.esports", cb=handler)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    try:
        loop.run_forever()
    finally:
        loop.close()

但是,这个库目前不支持与WebSockets连接,因此上面的库无法工作(目前看来它是正在研究中 )。

对于您的代码来说,它失败的唯一原因是您要发送的消息没有以\r\n结尾,这是NATS协议所需的。代码按照预期的方式处理此更改:

代码语言:javascript
复制
import asyncio
import websockets


async def esports():
    async with websockets.connect('wss://ws.sofascore.com:9222') as websocket:
        msg = await websocket.recv()
        print(f"From Server: {msg}")

        await websocket.send(
            'CONNECT {"no_responders":true,"protocol":1,"verbose":false,"pedantic":false,"user":"none","pass":"none","lang":"nats.ws","version":"1.8.1","headers":true}\r\n'
        )

        await websocket.send("SUB sport.esports 1\r\n")

        async for msg in websocket:
            print(f"From Server: {msg}")


asyncio.run(esports())

当然,这最终会被断开,因为它不响应PING消息。下面是一个更充实的脚本,它实现了足够多的NATS协议来记录sport.esports消息:

代码语言:javascript
复制
import asyncio
import json
import textwrap

from dataclasses import dataclass

import websockets


class SofaError(Exception):
    pass


def message_string(message, data=None, pretty=False):
    s = message
    if data is not None:
        if pretty:
            s += json.dumps(data, indent=2)
        else:
            s += json.dumps(data, separators=(",", ":"))
    return s


def log(pre, message, data=None):
    print(textwrap.indent(message_string(message, data, True), pre))


def recv_log(message, data=None):
    log("< ", message, data)


async def send(websocket, message, data=None):
    log("> ", message, data)
    data = (message_string(message, data, False) + "\r\n").encode()
    await websocket.send(data)


async def connect_and_subscribe(websocket):
    connect_options = {
        "no_responders": True,
        "protocol": 1,
        "verbose": False,
        "pedantic": False,
        "user": "none",
        "pass": "none",
        "lang": "nats.ws",
        "version": "1.8.1",
        "headers": True,
    }
    await send(websocket, "CONNECT ", connect_options)
    await send(websocket, "SUB sport.esports 1")


@dataclass
class NatsMsg:
    subject: str
    sid: str
    reply_to: str
    size: int
    payload: bytes


def parse_msg(info_line, pending_data):
    if not info_line:
        raise SofaError("No payload information received")

    info = [b.decode(errors="replace") for b in info_line.split(b" ")]

    if len(info) == 3:
        subject, sid, size = info
        reply_to = None
    elif len(info) == 4:
        subject, sid, reply_to, size = info
    else:
        raise SofaError("Unrecognized info format")

    try:
        size = int(size)
    except ValueError:
        raise SofaError("Bad payload size")

    if len(pending_data) < size:
        raise SofaError("Incomplete payload")

    payload = pending_data[:size]
    pending_data = pending_data[size:]

    return NatsMsg(subject, sid, reply_to, size, payload), pending_data


async def handler(websocket, ws_message, connected):
    while len(ws_message):
        nats_message, _, ws_message = ws_message.partition(b"\r\n")
        if not nats_message:
            continue

        op, _, rest = nats_message.partition(b" ")

        if op == b"-ERR":
            recv_log(nats_message.decode(errors="replace"))
            err = rest.strip(b"'").decode(errors="replace") if rest else "(No message received)"
            raise SofaError(f"Server error: {err}")

        elif op == b"INFO":
            info_options = json.loads(rest) if rest else None
            recv_log("INFO ", info_options)

            if not connected:
                await connect_and_subscribe(websocket)
                connected = True

        elif op == b"PING":
            recv_log("PING")
            await send(websocket, "PONG")

        elif op == b"MSG":
            try:
                msg, ws_message = parse_msg(rest, ws_message)
            except SofaError as e:
                recv_log(f"MSG (Error: {e}) {rest}")
                continue

            msg_info = (
                f"MSG subject={msg.subject} sid={msg.sid} "
                f"reply-to={msg.reply_to} nbytes={msg.size}:\n"
            )

            try:
                decoded = msg.payload.decode()
                data = json.loads(decoded)
            except UnicodeError:
                recv_log(f"{msg_info}{msg.payload}")
            except json.JSONDecodeError:
                recv_log(f"{msg_info}{decoded}")
            else:
                recv_log(msg_info, data)

        else:
            recv_log(f"(Unhandled op) {nats_message.decode(errors='replace')}")

    return connected


async def main():
    async with websockets.connect("wss://ws.sofascore.com:9222") as websocket:
        connected = False
        async for message in websocket:
            connected = await handler(websocket, message, connected)


if __name__ == "__main__":
    asyncio.run(main())
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73720939

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档