首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >进程异步导致另一个线程-应用程序体系结构(python-3.7)

进程异步导致另一个线程-应用程序体系结构(python-3.7)
EN

Stack Overflow用户
提问于 2022-02-04 14:30:01
回答 1查看 100关注 0票数 0

我有一个从Binance API接收数据(Trades)的程序。这些数据将在一个带有破折号和图形化的web应用程序中处理和可视化。

为了获得最好的性能和最轻微的延迟,我的程序有3个线程:

线程1- Binance API - get请求- Trades

代码语言:javascript
复制
if __name__ == "__main__":
    try:
         loop = asyncio.get_event_loop()
         binance-thread = threading.Thread(target=start_thread_1)
         ...

def start_thread_1():
     loop.run_until_complete(main(api_key,secret_key))

async def main(api_key,secret_key):
     client = await AsyncClient.create(api_key,secret_key)
     await trades_listener(client)

async def trades_listener(client):
    bm = BinanceSocketManager(client)
    symbol = 'BTCUSDT'
    async with bm.trade_socket(symbol=symbol) as stream:
        while True:
            msg = await stream.recv()

            event_type = msg['e']
            ...
            trade = Trade(event_type,...)
            # <-- safe trade SOMEWHERE to process in other thread ? safe to: process_trades_list

线程2- Web应用程序-显示Trades和处理后的Trades数据

代码语言:javascript
复制
web-thread = threading.Thread(target=webserver.run_server)
...
not worth to mention

线程3-处理数据-处理交易(计算RSI,过滤大交易,等等)

代码语言:javascript
复制
if __name__ == "__main__":
    try:
         loop = asyncio.get_event_loop()
         binance-thread = threading.Thread(target=start_thread_1)
         web-thread = threading.Thread(target=webserver.run_server)
         process-thread = threading.Thread(target=start_thread_3)
         ...
         .start()
         .sleep()
         etc.
         .join()

def start_thread_3():
    process_trades()

def process_trades():
    global process_trades_list
    while True:
        while len(process_trades_list) > 0:
            trade = process_trades_list[0]
            process_trades_list.pop(0)
            # ...do calculation etc.

如何安全/将数据从thread_1 /异步线程传递给thread_3?我试图把交易放到一个名为process_trades_list的列表中,然后循环while len(process_trades_list) > 0所有的交易。在循环I中,pop()处理列表中的交易--但这似乎以某种方式破坏了程序,而没有抛出错误。做这件事最好的方法是什么?

异步流可能会被新来的交易垃圾处理,我希望将负载降到最低。

EN

回答 1

Stack Overflow用户

发布于 2022-02-05 03:14:04

在这里,您需要一个queue.Queue而不是一个列表。最后一个代码片段如下所示:

代码语言:javascript
复制
import queue

if __name__ == "__main__":
    try:
         q = queue.Queue()
         binance_thread = threading.Thread(target=start_thread_1,
                                           args=(q,))
         web_thread = threading.Thread(target=webserver.run_server)
         process)thread = threading.Thread(target=process_trades, 
                                           args=(q,), daemon=True)
         ...
         .start()
         .sleep()
         etc.
         .join()

def process_trades(q):
    while True:
        trade = q.get()
        # ...do calculation etc.

我取消了对get_event_loop的调用,因为您没有使用返回的对象。我消除了start_thread_3函数,这是不必要的。

我使线程-3成为一个守护进程,因此如果所有其他操作都完成了,它将不会使应用程序保持打开状态。

队列应该在主线程中创建一次,并显式地传递给每个需要访问它的线程。这就消除了对全局变量的需求。

过程交易功能变得简单多了。q.get()调用会阻塞,直到对象可用为止。它还将对象从队列中弹出。

接下来,还必须修改线程-1,以便将对象放到队列中,如下所示:

代码语言:javascript
复制
def start_thread_1(q):
     asyncio.run(main(api_key,secret_key, q))

async def main(api_key,secret_key, q):
     client = await AsyncClient.create(api_key,secret_key)
     await trades_listener(client, q)

async def trades_listener(client, q):
    bm = BinanceSocketManager(client)
    symbol = 'BTCUSDT'
    async with bm.trade_socket(symbol=symbol) as stream:
        while True:
            msg = await stream.recv()

            event_type = msg['e']
            ...
            trade = Trade(event_type,...)
            q.put(trade)

q.put函数是您如何安全地将一个trade对象放入队列中,这将导致线程-3中的活动。

我修改了start_thread1函数:这里是为这个线程启动事件循环机制的好地方。

你会问如何避免垃圾邮件攻击你的程序。队列的方法允许您限制它们的大小,并且可能在交易满的情况下丢弃它们。

我不明白您要用线程-1中的if __name__ == '__main__'逻辑做什么。程序只能有一个入口点,只有一个模块名为'__main__'。在我看来,这必须是第三条线。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70988026

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档