我有一个从Binance API接收数据(Trades)的程序。这些数据将在一个带有破折号和图形化的web应用程序中处理和可视化。
为了获得最好的性能和最轻微的延迟,我的程序有3个线程:
线程1- Binance API - get请求- Trades
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
binance-thread = threading.Thread(target=start_thread_1)
...
def start_thread_1():
loop.run_until_complete(main(api_key,secret_key))
async def main(api_key,secret_key):
client = await AsyncClient.create(api_key,secret_key)
await trades_listener(client)
async def trades_listener(client):
bm = BinanceSocketManager(client)
symbol = 'BTCUSDT'
async with bm.trade_socket(symbol=symbol) as stream:
while True:
msg = await stream.recv()
event_type = msg['e']
...
trade = Trade(event_type,...)
# <-- safe trade SOMEWHERE to process in other thread ? safe to: process_trades_list线程2- Web应用程序-显示Trades和处理后的Trades数据
web-thread = threading.Thread(target=webserver.run_server)
...
not worth to mention线程3-处理数据-处理交易(计算RSI,过滤大交易,等等)
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
binance-thread = threading.Thread(target=start_thread_1)
web-thread = threading.Thread(target=webserver.run_server)
process-thread = threading.Thread(target=start_thread_3)
...
.start()
.sleep()
etc.
.join()
def start_thread_3():
process_trades()
def process_trades():
global process_trades_list
while True:
while len(process_trades_list) > 0:
trade = process_trades_list[0]
process_trades_list.pop(0)
# ...do calculation etc.如何安全/将数据从thread_1 /异步线程传递给thread_3?我试图把交易放到一个名为process_trades_list的列表中,然后循环while len(process_trades_list) > 0所有的交易。在循环I中,pop()处理列表中的交易--但这似乎以某种方式破坏了程序,而没有抛出错误。做这件事最好的方法是什么?
异步流可能会被新来的交易垃圾处理,我希望将负载降到最低。
发布于 2022-02-05 03:14:04
在这里,您需要一个queue.Queue而不是一个列表。最后一个代码片段如下所示:
import queue
if __name__ == "__main__":
try:
q = queue.Queue()
binance_thread = threading.Thread(target=start_thread_1,
args=(q,))
web_thread = threading.Thread(target=webserver.run_server)
process)thread = threading.Thread(target=process_trades,
args=(q,), daemon=True)
...
.start()
.sleep()
etc.
.join()
def process_trades(q):
while True:
trade = q.get()
# ...do calculation etc.我取消了对get_event_loop的调用,因为您没有使用返回的对象。我消除了start_thread_3函数,这是不必要的。
我使线程-3成为一个守护进程,因此如果所有其他操作都完成了,它将不会使应用程序保持打开状态。
队列应该在主线程中创建一次,并显式地传递给每个需要访问它的线程。这就消除了对全局变量的需求。
过程交易功能变得简单多了。q.get()调用会阻塞,直到对象可用为止。它还将对象从队列中弹出。
接下来,还必须修改线程-1,以便将对象放到队列中,如下所示:
def start_thread_1(q):
asyncio.run(main(api_key,secret_key, q))
async def main(api_key,secret_key, q):
client = await AsyncClient.create(api_key,secret_key)
await trades_listener(client, q)
async def trades_listener(client, q):
bm = BinanceSocketManager(client)
symbol = 'BTCUSDT'
async with bm.trade_socket(symbol=symbol) as stream:
while True:
msg = await stream.recv()
event_type = msg['e']
...
trade = Trade(event_type,...)
q.put(trade)q.put函数是您如何安全地将一个trade对象放入队列中,这将导致线程-3中的活动。
我修改了start_thread1函数:这里是为这个线程启动事件循环机制的好地方。
你会问如何避免垃圾邮件攻击你的程序。队列的方法允许您限制它们的大小,并且可能在交易满的情况下丢弃它们。
我不明白您要用线程-1中的if __name__ == '__main__'逻辑做什么。程序只能有一个入口点,只有一个模块名为'__main__'。在我看来,这必须是第三条线。
https://stackoverflow.com/questions/70988026
复制相似问题