我需要每秒钟向用户发送来自rabbitMQ的消息。为此,我使用两个线程。第一个接收来自rabbitMQ的消息并将它们排队。第二个线程从队列中获取消息,对它们进行处理,并通过web套接字将它们发送给用户。我的问题是如何最好地实施这一机制。现在我的代码如下所示:
def __init__(self):
self.data = {}
self.queue = Queue()
def download_data(self):
started_time = time.perf_counter()
while True:
if time.perf_counter() - started_time >= 1:
started_time = time.perf_counter()
for _ in range(self.queue.qsize()):
self.append_data(self.queue.get())
self.sync_send_data_to_user(self.data)
self.data = {}
def message_handle(self, ch, method, properties, body):
message = json.loads(body)
self.queue.put(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consuming(self):
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
channel = connection.channel()
rabbit_queue = channel.queue_declare("to_client")
channel.basic_consume(on_message_callback=self.message_handle, queue="to_client")
threading.Thread(target=channel.start_consuming, args=[]).start()此代码工作,但有时它发送的消息超过1次每秒。第一个线程是rabbitMQ使用者回调函数message_handle,第二个线程是无限循环函数download_data。message_handle检查是否超过1秒,如果发生这种情况,message_handle将阻塞使用队列的线程,直到检索队列中的所有项为止。
更新:--我想我让自己变得更难了,最后我改变了一些逻辑。现在我有两个线程,其中一个处理来自rabbitmq的消息并将它们发送到队列,函数message_handle。python中的队列是线程安全的,所以它应该可以工作。第二个线程检查是否已过1秒,如果已从队列中检索所有数据并将其发送给用户,则为download_data函数。
发布于 2021-06-20 07:38:06
https://stackoverflow.com/questions/68037241
复制相似问题