首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RabbitMQ和python queue.Queue.get()卡住了

RabbitMQ和python queue.Queue.get()卡住了
EN

Stack Overflow用户
提问于 2022-02-28 22:21:58
回答 1查看 154关注 0票数 0

我正在尝试将即将出现的数据复制到另一个queue.Queue()中,以便在另一个线程中完成其他事情。

代码语言:javascript
复制
def rgb_callback(ch, method, properties, body):
rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
READ_QUEUE.put(item=rgb_color_bytes, block=True)

以及这里的配置行

代码语言:javascript
复制
def start_rgb_consume_from_rabbitmq():
try:

    #       RABBITMQ PART       #

    connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
    # connection.add_callback_threadsafe(rgb_data_read_from_python_queue)
    rgb_channel = connection.channel()
    rgb_channel.queue_declare(queue=RGB_QUEUE)
    rgb_channel.queue_purge(queue=RGB_QUEUE)
    rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)

    rgb_channel.start_consuming()

except Exception as err:
    print("Exception :", err)
    rgb_channel.stop_consuming()

except KeyboardInterrupt:
    rgb_channel.stop_consuming()
    sys.exit(0)

最后,我失败的queue.Queue().get()函数:

代码语言:javascript
复制
def rgb_data_read_from_python_queue():
if STATUS2:
    cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)
rgb_color_bytes = None
while True:
    print("POINTER 1")
    try:
        rgb_frame = READ_QUEUE.get(block=True)
    except queue.Empty:
        rgb_frame = None

    if not rgb_frame:
        continue

    print("POINTER 2")

它停在那里。我是线程和队列架构方面的新手。我正在测试add_callbak_threadsafe(),并且我知道get()阻塞了线程。但是我在这里创建了两个不同的线程

代码语言:javascript
复制
rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
rgb_data_thread.start()
consumer_thread.start()

因此,如果我创建了两个线程,为什么queue.Queue().get()会阻塞另一个线程。谢谢你的帮助。我可以共享整个代码,它非常简单,几乎170行。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-03-01 11:01:28

在这里,我解决了这个问题,我想为那些试图将数据放到rabbitmq队列和使用者读取的人发布,然后将其放到python队列中,并在其他线程上做一些事情。我希望它能帮助到一些人。

代码语言:javascript
复制
#       RGB CONSUME     #

import numpy as np
import pika
import sys
import cv2
import queue
import threading

#       MACRO DEFINITIONS       #

RGB_QUEUE = 'RGBStream0'
WINDOW_TITLE = 'RGB Stream Consumer1'
WINDOW_TITLE2 = 'From Python Queue'
HOST_NAME = 'localhost'
READ_QUEUE = queue.Queue(200)
CONSUMER_THREAD_NAME = 'ConsumerThread'
THREAD_STOP_FLAG = False
TEST_FLAG = False


#       PARAMETER CHECK      #

# Status degiskeni ile, sadece dagitim yapilmasi yada dagitim ve displayin aynı anda yapilmasi durumu saklanmakta.
STATUS = None
STATUS2 = None
if len(sys.argv) > 1:
    if sys.argv[1] == '-display':
        STATUS = False
        STATUS2 = True
    else:
        print("Gecersiz parametre")
        exit(1)
else:
    pass


#       LOCAL FUNCTIONS     #

def rgb_callback(ch, method, properties, body):
    rgb_color_bytes = np.frombuffer(body, dtype=np.uint8)
    READ_QUEUE.put(item=rgb_color_bytes, block=True)
    print(rgb_color_bytes)


def start_rgb_consume_from_rabbitmq():
    try:

        #       RABBITMQ PART       #

        connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
        rgb_channel = connection.channel()
        rgb_channel.queue_declare(queue=RGB_QUEUE)
        rgb_channel.queue_purge(queue=RGB_QUEUE)
        rgb_channel.basic_consume(queue=RGB_QUEUE, on_message_callback=rgb_callback, auto_ack=True)
        if STATUS:
            cv2.namedWindow(WINDOW_TITLE, cv2.WINDOW_AUTOSIZE)

        if STATUS or STATUS2:
            print(' *** Mesajlar bekleniyor *** Goruntuleme acik *** Cikmak icin CTRL+C ***')
        else:
            print(' *** Mesajlar bekleniyor *** Goruntuleme icin -display *** Cikmak icin CTRL+C ***')

        rgb_channel.start_consuming()

    except Exception as err:
        print("Exception :", err)
        rgb_channel.stop_consuming()

    except KeyboardInterrupt:
        print('Interrupted ^^ Channel Kapatildi')
        rgb_channel.stop_consuming()
        sys.exit(0)


def rgb_data_read_from_python_queue():
    if STATUS2:
        cv2.namedWindow(WINDOW_TITLE2, cv2.WINDOW_AUTOSIZE)

    while True:
        rgb_frame = READ_QUEUE.get(block=True)
        # 640 * 480
        if rgb_frame.size == 921600:
            rgb_data_reshaped = np.reshape(rgb_frame, [480, 640, 3])
        # 1280 * 720
        elif rgb_frame.size == 2764800:
            rgb_data_reshaped = np.reshape(rgb_frame, [720, 1280, 3])
        # 1920 * 1080
        elif rgb_frame.size == 6220800:
            rgb_data_reshaped = np.reshape(rgb_frame, [1080, 1920, 3])
        else:
            print("Something wrong i can feel it")
            exit(1)

        if STATUS2:
            cv2.imshow(WINDOW_TITLE2, rgb_data_reshaped)
            cv2.waitKey(1)


try:
    rgb_data_thread = threading.Thread(target=rgb_data_read_from_python_queue)
    consumer_thread = threading.Thread(target=start_rgb_consume_from_rabbitmq)
    rgb_data_thread.start()
    consumer_thread.start()
except KeyboardInterrupt:
    print('Interrupted')
    cv2.destroyAllWindows()
    sys.exit(0)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71301684

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档