我对RabbitMQ非常陌生。
我已经建立了一个“主题”交换。消费者可以在发布者之后启动。我希望消费者能够接收在启动之前已经发送的消息,而这些消息还没有被使用。
使用以下参数设置交换:
exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0使用此参数发布消息:
delivery_mode => 2消费者使用get()从交换中检索消息。
不幸的是,在任何客户端启动之前发布的任何消息都会丢失。我使用了不同的组合。
我想我的问题是交换不能保存消息。也许我需要在发布者和消费者之间建立一个队列。但这似乎不适用于“主题”交换,在这种交换中,消息是由一个键路由的。
我应该如何继续?我使用Perl绑定Net::RabbitMQ (无所谓)和RabbitMQ 2.2.0。
发布于 2011-05-28 01:29:34
如果在消息发布时没有连接的消费者可用来处理消息,则需要一个持久队列来存储消息。
交换不存储消息,但队列可以存储消息。令人困惑的是,交换可以被标记为“持久的”,但这实际上意味着,如果您重新启动代理,交换本身仍将存在,但它并不意味着发送到该交换的任何消息都会自动持久化。
考虑到这一点,这里有两个选择:
我会选择#1。可能没有太多的步骤要执行,你总是可以编写所需步骤的脚本,以便它们可以重复执行。另外,如果所有的消费者都从同一个队列中拉出(而不是每个用户都有一个专用队列),这实际上是最小的管理开销。
队列是需要适当管理和控制的东西。否则,您可能会最终导致流氓消费者声明持久队列,使用它们几分钟,但再也不会使用它们。不久之后,您将拥有一个永久增长的队列,而不会减少其大小,并且即将出现代理启示录。
发布于 2014-06-27 13:37:27
正如Brian提到的,交换不存储消息,主要负责将消息路由到另一个交换或队列。如果交换没有绑定到队列,则发送到该交换的所有消息都将“丢失”。
您不应该需要在发布服务器脚本中声明固定客户端队列,因为这可能是不可伸缩的。队列可以由发布者动态创建,并使用交换到交换绑定在内部进行路由。
RabbitMQ支持exchange到exchange绑定,这将允许拓扑灵活性、解耦和其他好处。你可以在RabbitMQ Exchange to Exchange Bindings [AMPQ]上阅读更多内容
RabbitMQ Exchange To Exchange Binding

使用queue创建具有持久性的交换到交换绑定的示例Python代码。
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')发布于 2021-06-02 10:14:59
你的案例似乎是“消息持久性”。
在RabbitMQ Tutorials docs中,您需要将queue和messages标记为持久的(下面的代码标记为C#版本。对于其他语言,您可以首选here)。
queue在RabbitMQ节点重新启动后仍然有效。为了做到这一点,我们需要声明它是持久的:channel.QueueDeclare(queue: "hello",
durable: true,
....);IBasicProperties.SetPersistent设置为true。var properties = channel.CreateBasicProperties();
properties.Persistent = true;https://stackoverflow.com/questions/6148381
复制相似问题