首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有什么方法可以通过websocket使用web3py监听事件吗?

有什么方法可以通过websocket使用web3py监听事件吗?
EN

Ethereum用户
提问于 2021-05-15 14:04:07
回答 2查看 3.7K关注 0票数 1

我正在尝试通过我的节点的websocket服务器监听事件,但是我似乎找不到使用web3Py库来记录事件的方法。文档的异步实现基本上是一个线程循环,它每隔一段时间都会查询节点。如果我有一个单独的契约,甚至是要查看的话,这将是可行的,但是当有多个过滤器单独查看时,这可能非常消耗资源。

考虑到websocket日志过滤在web3py实现中可用,是否有一种方法来实现websocket日志过滤?

EN

回答 2

Ethereum用户

回答已采纳

发布于 2021-07-26 09:22:36

Web3.py不提供自动订阅的方法,因此我们必须手动侦听websocket,并使用rpc方法将订阅发送到日志:

代码语言:javascript
复制
import asyncio
import json

from web3 import Web3
from web3.middleware import geth_poa_middleware # only needed for PoA networks like BSC
import requests
from websockets import connect
from eth_abi import decode_single, decode_abi

adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) # pool connections and max size are for HTTP calls only, since we are using WS they are not needed. 
session = requests.Session()
w3 = Web3(Web3.WebsocketProvider("ws://<Provider>"))
w3.middleware_onion.inject(geth_poa_middleware, layer=0) # only needed for PoA networks like BSC


async def get_event():
    async with connect("ws://localhost:8545") as ws:
        await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", {
                    "address": ['0x15c921AF5F49a42......'],
                    "topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]}))
        subscription_response = await ws.recv()
        print(subscription_response)
        while True:
            try:
                message = await asyncio.wait_for(ws.recv(), timeout=60)
                decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:]))
                print(list(decoded))
                pass
            except:
                pass
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(get_event())

在获取日志的数据之后,我们使用eth_abi对它们进行解码,并提取日志数据。这是一个更好的选择,而不是创建一个web3合同和等待收据,以获得日志,我们将不得不根据主题过滤。

票数 4
EN

Ethereum用户

发布于 2023-02-09 13:49:31

您还可以使用web3代理提供程序并订阅事件。

Web3-代理提供者是一个python包,用于使用Socks和WebSocket代理连接到HTTP。

样本代码:

代码语言:javascript
复制
import asyncio
from Crypto.Hash import keccak
from web3 import Web3
from python_socks import ProxyType
from web3_proxy_providers import AsyncSubscriptionWebsocketWithProxyProvider

async def callback(subs_id: str, json_result):
    print(json_result)

async def main(loop: asyncio.AbstractEventLoop):
    provider = AsyncSubscriptionWebsocketProvider(
        loop=loop,
        endpoint_uri='wss://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
    )
    
    # subscribe to Deposit and Withdrawal events for WETH contract
    weth_contract_address = Web3.to_checksum_address('0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2')
    deposit_topic = "0x" + keccak.new(data=b'Deposit(address,uint256)', digest_bits=256).hexdigest()
    withdrawal_topic = "0x" + keccak.new(data=b'Withdrawal(address,uint256)', digest_bits=256).hexdigest()
    subscription_id = await provider.subscribe(
        [
            'logs',
            {
                "address": weth_contract_address,
                "topics": [deposit_topic, withdrawal_topic]
            }
        ],
        callback
    )
    print(f'Subscribed with id {subscription_id}')
    print(f'Subscribed with id {subscription_id}')
    
    # unsubscribe after 30 seconds
    await asyncio.sleep(30)
    await provider.unsubscribe(subscription_id)

if __name__ == '__main__':
    async_loop = asyncio.get_event_loop()
    async_loop.run_until_complete(main(loop=async_loop))

免责声明:我是这个软件包https://github.com/sinarezaei/web3-proxy-providers的所有者

票数 0
EN
页面原文内容由Ethereum提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://ethereum.stackexchange.com/questions/98995

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档