我正在尝试通过我的节点的websocket服务器监听事件,但是我似乎找不到使用web3Py库来记录事件的方法。文档的异步实现基本上是一个线程循环,它每隔一段时间都会查询节点。如果我有一个单独的契约,甚至是要查看的话,这将是可行的,但是当有多个过滤器单独查看时,这可能非常消耗资源。
考虑到websocket日志过滤在web3py实现中可用,是否有一种方法来实现websocket日志过滤?
发布于 2021-07-26 09:22:36
Web3.py不提供自动订阅的方法,因此我们必须手动侦听websocket,并使用rpc方法将订阅发送到日志:
import asyncio
import json
from web3 import Web3
from web3.middleware import geth_poa_middleware # only needed for PoA networks like BSC
import requests
from websockets import connect
from eth_abi import decode_single, decode_abi
adapter = requests.sessions.HTTPAdapter(pool_connections=50000, pool_maxsize=50000) # pool connections and max size are for HTTP calls only, since we are using WS they are not needed.
session = requests.Session()
w3 = Web3(Web3.WebsocketProvider("ws://<Provider>"))
w3.middleware_onion.inject(geth_poa_middleware, layer=0) # only needed for PoA networks like BSC
async def get_event():
async with connect("ws://localhost:8545") as ws:
await ws.send(json.dumps({"id": 1, "method": "eth_subscribe", "params": ["logs", {
"address": ['0x15c921AF5F49a42......'],
"topics": [w3.keccak(text="Sync(uint112,uint112)").hex()]}]}))
subscription_response = await ws.recv()
print(subscription_response)
while True:
try:
message = await asyncio.wait_for(ws.recv(), timeout=60)
decoded = decode_single('(uint112,uint112)',bytearray.fromhex(json.loads(message)['params']['result']['data'][2:]))
print(list(decoded))
pass
except:
pass
if __name__ == "__main__":
loop = asyncio.get_event_loop()
while True:
loop.run_until_complete(get_event())在获取日志的数据之后,我们使用eth_abi对它们进行解码,并提取日志数据。这是一个更好的选择,而不是创建一个web3合同和等待收据,以获得日志,我们将不得不根据主题过滤。
发布于 2023-02-09 13:49:31
您还可以使用web3代理提供程序并订阅事件。
Web3-代理提供者是一个python包,用于使用Socks和WebSocket代理连接到HTTP。
样本代码:
import asyncio
from Crypto.Hash import keccak
from web3 import Web3
from python_socks import ProxyType
from web3_proxy_providers import AsyncSubscriptionWebsocketWithProxyProvider
async def callback(subs_id: str, json_result):
print(json_result)
async def main(loop: asyncio.AbstractEventLoop):
provider = AsyncSubscriptionWebsocketProvider(
loop=loop,
endpoint_uri='wss://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
)
# subscribe to Deposit and Withdrawal events for WETH contract
weth_contract_address = Web3.to_checksum_address('0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2')
deposit_topic = "0x" + keccak.new(data=b'Deposit(address,uint256)', digest_bits=256).hexdigest()
withdrawal_topic = "0x" + keccak.new(data=b'Withdrawal(address,uint256)', digest_bits=256).hexdigest()
subscription_id = await provider.subscribe(
[
'logs',
{
"address": weth_contract_address,
"topics": [deposit_topic, withdrawal_topic]
}
],
callback
)
print(f'Subscribed with id {subscription_id}')
print(f'Subscribed with id {subscription_id}')
# unsubscribe after 30 seconds
await asyncio.sleep(30)
await provider.unsubscribe(subscription_id)
if __name__ == '__main__':
async_loop = asyncio.get_event_loop()
async_loop.run_until_complete(main(loop=async_loop))免责声明:我是这个软件包https://github.com/sinarezaei/web3-proxy-providers的所有者
https://ethereum.stackexchange.com/questions/98995
复制相似问题