首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Solana-py中创建websocket管理器

如何在Solana-py中创建websocket管理器
EN

Stack Overflow用户
提问于 2022-07-30 09:47:07
回答 1查看 34关注 0票数 1

我正在使用PythonV3.7+ solana V3.7+solana v0.21.0,并试图创建一个websocket管理器来处理几个不同的订阅,但是每当调用websocket.recv()asyncstdlib.enumerate(websocket)时,代码似乎都会阻塞。文档中的示例

代码语言:javascript
复制
import asyncio
from asyncstdlib import enumerate
from solana.rpc.websocket_api import connect


async def main():
    # First example using websocket.recv()
    async with connect("wss://api.devnet.solana.com") as websocket:
        await websocket.logs_subscribe()
        first_resp = await websocket.recv()
        subscription_id = first_resp.result
        next_resp = await websocket.recv()         <--- Blocks until msg received.
        print(next_resp)
        await websocket.logs_unsubscribe(subscription_id)


    # Second example using client as an infinite asynchronous iterator:
    async with connect("wss://api.devnet.solana.com") as websocket:  
        await websocket.logs_subscribe()
        first_resp = await websocket.recv()
        subscription_id = first_resp.result
        async for idx, msg in enumerate(websocket): <--- Blocks until msg received.
            if idx == 3:
                break
            print(msg)
        await websocket.logs_unsubscribe(subscription_id)

asyncio.run(main())

这个想法是能够迭代一个无限循环,这样就可以将新的订阅添加到websocket中,例如:

代码语言:javascript
复制
from solana.rpc.request_builder import LogsSubscribeFilter
from solana.rpc.websocket_api import connect
from asgiref.sync import sync_to_async
from solana.publickey import PublicKey
from time import sleep


async def websocket_manager(rpc: str):
  async with connect(rpc) as websocket:
        while True:
            active_pubkeys = await sync_to_async(get_my_active_pubkeys)()
            if active_pubkeys:
                # Add missing pubkeys
                for pubkey in active_pubkeys:
                    if ws.get(pubkey) in websocket.subscriptions:
                        continue

                    print(f"Subscribe to {pubkey}")
                    await websocket.logs_subscribe(LogsSubscribeFilter.mentions(PublicKey(pubkey)))
                    first_resp = await websocket.recv()
                    ws[pubkey] = first_resp.result # Maps the pubkey to the subscription ID

                # Delete non used subscriptions:
                for non_used_pubkey in set(active_pubkeys) ^ set(ws.keys()):
                    if non_used_pubkey in ws:
                        print(f"Delete subscription for pubkey #{non_used_pubkey}")
                        websocket.account_unsubscribe(ws[non_used_pubkey])
                        ws.pop(non_used_pubkey)

            # <-- HERE HOW TO ITERATE SUBSCRIPTIONS WITHOUT BLOCKING THE MANAGER????

            sleep(30)  # Sleep for 30 seconds

使用一个新线程或一个子进程来读取websocket消息,这样它们就不会阻塞主功能了,这样会安全吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-01 09:06:48

我明白了,只是需要使用task = asyncio.create_task(my_msg_listener_function(websocket, ws))来让另一个任务同时运行并处理websocket消息。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73174530

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档