首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RabbitMQ:具有主题交换的持久消息

RabbitMQ:具有主题交换的持久消息
EN

Stack Overflow用户
提问于 2011-05-27 13:57:32
回答 3查看 26.4K关注 0票数 72

我对RabbitMQ非常陌生。

我已经建立了一个“主题”交换。消费者可以在发布者之后启动。我希望消费者能够接收在启动之前已经发送的消息,而这些消息还没有被使用。

使用以下参数设置交换:

代码语言:javascript
复制
exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

使用此参数发布消息:

代码语言:javascript
复制
delivery_mode => 2

消费者使用get()从交换中检索消息。

不幸的是,在任何客户端启动之前发布的任何消息都会丢失。我使用了不同的组合。

我想我的问题是交换不能保存消息。也许我需要在发布者和消费者之间建立一个队列。但这似乎不适用于“主题”交换,在这种交换中,消息是由一个键路由的。

我应该如何继续?我使用Perl绑定Net::RabbitMQ (无所谓)和RabbitMQ 2.2.0

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2011-05-28 01:29:34

如果在消息发布时没有连接的消费者可用来处理消息,则需要一个持久队列来存储消息。

交换不存储消息,但队列可以存储消息。令人困惑的是,交换可以被标记为“持久的”,但这实际上意味着,如果您重新启动代理,交换本身仍将存在,但它并不意味着发送到该交换的任何消息都会自动持久化。

考虑到这一点,这里有两个选择:

  1. 在启动发布器以自己创建队列之前,先执行a administrative 。您可以使用web UI或命令行工具来执行此操作。确保您将其创建为持久队列,以便它将存储路由到它的任何消息,即使没有活动的consumers.
  2. Assuming,您的消费者被编码为总是在启动时声明(并因此自动创建)它们的交换和队列(并且它们声明它们是持久的),在启动任何发布器之前,至少运行您的所有消费者一次。这将确保正确创建所有队列。然后,您可以关闭使用者,直到真正需要它们为止,因为队列将持久地存储路由到它们的任何未来消息。

我会选择#1。可能没有太多的步骤要执行,你总是可以编写所需步骤的脚本,以便它们可以重复执行。另外,如果所有的消费者都从同一个队列中拉出(而不是每个用户都有一个专用队列),这实际上是最小的管理开销。

队列是需要适当管理和控制的东西。否则,您可能会最终导致流氓消费者声明持久队列,使用它们几分钟,但再也不会使用它们。不久之后,您将拥有一个永久增长的队列,而不会减少其大小,并且即将出现代理启示录。

票数 81
EN

Stack Overflow用户

发布于 2014-06-27 13:37:27

正如Brian提到的,交换不存储消息,主要负责将消息路由到另一个交换或队列。如果交换没有绑定到队列,则发送到该交换的所有消息都将“丢失”。

您不应该需要在发布服务器脚本中声明固定客户端队列,因为这可能是不可伸缩的。队列可以由发布者动态创建,并使用交换到交换绑定在内部进行路由。

RabbitMQ支持exchange到exchange绑定,这将允许拓扑灵活性、解耦和其他好处。你可以在RabbitMQ Exchange to Exchange Bindings [AMPQ]上阅读更多内容

RabbitMQ Exchange To Exchange Binding

使用queue创建具有持久性的交换到交换绑定的示例Python代码。

代码语言:javascript
复制
#!/usr/bin/env python
import pika
import sys
 
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
 
 
#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
 
#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
 
#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
 
##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
 
#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
票数 20
EN

Stack Overflow用户

发布于 2021-06-02 10:14:59

你的案例似乎是“消息持久性”。

RabbitMQ Tutorials docs中,您需要将queuemessages标记为持久的(下面的代码标记为C#版本。对于其他语言,您可以首选here)。

  1. 首先,在Publisher中,您需要确保queueRabbitMQ节点重新启动后仍然有效。为了做到这一点,我们需要声明它是持久的:

代码语言:javascript
复制
channel.QueueDeclare(queue: "hello",
                     durable: true,
                     ....);

  1. 其次,在消费者中,您需要将消息标记为持久消息-通过将IBasicProperties.SetPersistent设置为true。

代码语言:javascript
复制
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/6148381

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档