
在消息传递过程中,可能会遇到各种问题,如网络故障、服务不可用、资源不足等,这些问题可能导致消息处理失败。为了解决这些问题,RabbitMQ 提供了重试机制,允许消息在处理失败后重新发送。
但如果是程序逻辑引起的错误,那么多次重试也是没有用的,当然也可以设置重试次数。
这里的 retry(重试)机制通常指的是 消息从队列发送给消费者后,消费失败或者未 ack 时触发的重发。而不是生产者发送给交换机时候失败触发的,这种触发是发布确认机制来解决的。
💥要启用 消费者消息重试机制,必须打开消息确认机制中的 AUTO 才行!
模式 | Spring自动重试机制 | 说明 |
|---|---|---|
NONE | ❌ 无法重试(已确认) | RabbitMQ立即确认消息 |
AUTO | ✅ 生效 | Spring 捕获异常自动重试 |
MANUAL | ❌ 不生效 | 需自己实现重试逻辑 |
spring:
rabbitmq:
addresses: amqp://liren:123123@127.0.0.1/lirendada
listener:
simple:
acknowledge-mode: auto # 消息确认设置为auto,如果处理消息出现异常,会自动进行重试
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待时长为5秒
max-attempts: 5 # 最大重试次数(包括自身消费的一次)首先是常量类:
// 重试机制
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";
public static final String RETRY_QUEUE = "retry_queue";然后配置以及绑定交换机和队列:
// 重试机制
@Bean("retryQueue")
public Queue retryQueue() {
return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
}
@Bean("retryExchange")
public DirectExchange retryExchange() {
return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE_NAME).durable(true).build();
}
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryQueue")Queue queue,
@Qualifier("retryExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();
}2. 发送消息
@RequestMapping("/retry")
public String retry() {
rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE_NAME, "retry", "retry test...");
return "发送成功!";
}2. 消费消息
import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RetryQueueListener {
//指定监听队列的名称
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message) throws Exception {
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());
//模拟处理失败
int num = 3/0;
System.out.println("处理完成");
}
}3. 运行程序,观察结果
http://127.0.0.1:8080/product/retry

但是如果对异常进行捕获了,那么就不会进行重试!
将消息确认机制改为手动 manual 模式,然后修改消费者代码:
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws IOException, InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息:%s,deliveryTag:%d \n", new String(message.getBody()), deliveryTag);
try {
// 模拟出现异常
int a = 3 / 0;
System.out.println("处理完成!");
channel.basicAck(deliveryTag, true); // 手动确认一下
} catch (Exception e) {
System.out.println("出现异常!");
Thread.sleep(1000);
channel.basicNack(deliveryTag, true, true); // 设置重新入队
}
}
可以看到,手动确认模式时,重试次数的限制不会像在自动确认模式下那样直接生效,因为是否重试以及何时重试更多地取决于应用程序的逻辑和消费者的实现。
自动确认模式下,RabbitMQ 会在消息被投递给消费者后自动确认消息。如果消费者处理消息时抛出异常,RabbitMQ 根据配置的重试参数自动将消息重新入队,从而实现重试。重试次数和重试间隔等参数可以直接在 RabbitMQ 的配置中设定,并且 RabbitMQ 会负责执行这些重试策略。
而在手动确认模式下,消费者需要显式地对消息进行确认。如果消费者在处理消息时遇到异常,可以选择不确认消息使消息可以重新入队。重试的控制权在于应用程序本身,而不是 RabbitMQ 的内部机制。应用程序可以通过自己的逻辑和利用 RabbitMQ 的高级特性来实现有效的重试策略。
💡 使用重试机制时需要注意:
unacked 的状态,导致消息积压。TTL(Time to Live)即过期时间。RabbitMQ 可以对消息和队列设置 TTL。
当消息达到存活时间之后,若还没有被消费,就会被自动清除。
咱们在网上购物,经常会遇到一个场景,当下单超过24小时还未付款,订单会被自动取消。还有类似的,申请退款之后,超过7天未被处理,则自动退款。

有两种方法设置消息的TTL:
# 配置文件
arguments:
x-message-ttl: 60000 # 毫秒2. 消息级 TTL(每条消息单独设置)
MessageProperties props = new MessageProperties();
props.setExpiration("60000"); // 毫秒如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。
针对每条消息设置 TTL 的方法是在发送消息的方法中设置 expiration 属性参数,单位为毫秒。
// TTL
public static final String TTL_TIME = "10000"; // 10s
public static final String TTL_EXCHANGE_NAME = "ttl_exchange";
public static final String TTL_QUEUE = "ttl_queue";2. 配置以及绑定交换机与队列:
// TTL
@Bean("ttlQueue")
public Queue ttlQueue() {
return QueueBuilder.durable(Constants.TTL_QUEUE).build();
}
@Bean("ttlExchange")
public DirectExchange ttlExchange() {
return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE_NAME).durable(true).build();
}
@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlQueue")Queue queue,
@Qualifier("ttlExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}3. 发送消息:
@RequestMapping("/ttl")
public String ttl() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(Constants.TTL_TIME);
return message;
}
};
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE_NAME, "ttl", "ttl test...", messagePostProcessor);
return "发送成功!";
}
// 另一种写法:因为 MessagePostProcessor 是函数式接口,所以可以用lambda简化
@RequestMapping("/ttl")
public String ttl() {
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE_NAME, "ttl", "ttl test...", message -> {
message.getMessageProperties().setExpiration(Constants.TTL_TIME);
return message;
});
return "发送成功!";
}发送消息后可以到管理页面观察队列,可以发现 10s 后队列中的消息就消失了!
直接在创建队列的时候使用封装好的 ttl() 方法即可设置队列中的消息过期时间,单位是毫秒。
@Bean("ttlQueue2")
public Queue ttlQueue2() {
return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(10000).build();
}实际上设置队列 TTL 的原理,是在创建队列时加入 x-message-ttl 参数实现的,下面是源码:

为什么这两种方法处理的方式不一样???
因为 RabbitMQ 的队列不是扫描式的,而是顺序读取式队列。
消息存放在队列中时,只有在它到达队列头部(准备被投递给消费者)时,RabbitMQ 才会检查它是否已过期。
换句话说:RabbitMQ 不会遍历整个队列去找 "已经过期" 的消息。只有 "轮到要被投递的消息" 时,才判断是否过期。
因此两者处理结果不同:
死信就是因为种种原因,而导致的无法被消费的信息。
有死信,自然就有死信队列。当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX(Dead Letter Exchange),绑定 DLX 的队列,就称为死信队列 DLQ(Dead Letter Queue)。

消息变成死信通常有以下几种可能:
Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false包含两部分:
死信交换机/队列 和 普通的交换机/队列 没有区别,只是处理的事情不同罢了!
首先是常量类:
// 死信
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DL_EXCHANGE = "dl_exchange";
public static final String DL_QUEUE = "dl_queue";然后声明以及配置交换机和队列:
// 正常队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder
.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("dlk") // 绑定死信路由键
.ttl(10000) // 过期时间设置10s,方便测试
.maxLength(10L) // 队列最大长度设为10,方便测试
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalQueue")Queue queue,
@Qualifier("normalExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
// 死信队列
@Bean("dlQueue")
public Queue dlQueue() {
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
@Bean("dlExchange")
public DirectExchange dlExchange() {
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).durable(true).build();
}
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlQueue")Queue queue,
@Qualifier("dlExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("dlk").noargs();
}@RequestMapping("/dlx")
public String dlx() {
// 1. 测试过期时间, 当时间达到TTL, 消息自动进入到死信队列
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dlx test...");
// 2. 测试队列长度溢出,消息自动进入到死信队列
for(int i = 0; i < 20; ++i) {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dlx test...");
}
return "发送成功!";
}
D:队列设置了持久化机制TTL:队列设置了消息过期时间Lim:队列设置了长度(x-max-length)DLX:队列设置了死信交换机(x-dead-letter-exchange)DLK:队列设置了死信路由键(x-dead-letter-routing-key)
发送消息:http://127.0.0.1:8080/product/dlx
发送之后:

10秒后,消息进入到死信队列:

生产者首先发送一条消息,然后经过交换器(normal_exchange)顺利地存储到队列(normal_queue)中。由于队列 normal_queue 设置了过期时间为 10s,在这 10s 内没有消费者消费这条消息,那么判定这条消息过期。由于设置了 DLX,过期之时,消息会被丢给交换器(dl_exchange)中,这时根据 RoutingKey 匹配,找到匹配的队列(dl_queue),最后消息被存储在 queue.dlx 这个死信队列中。
队列长度设置为 10,我们发送 20 条数据,会有 10 条数据直接进入到死信队列
发送前,死信队列只有一条数据:

运行后,可以看到死信队列变成了 11 条:

过期之后,正常队列的 10 条也会进入到死信队列:

写消费者代码,并强制异常,测试拒绝签收:
@Component
public class DLListener {
// 监听正常队列
@RabbitListener(queues = Constants.NORMAL_QUEUE)
public void normalQueue(Message message, Channel channel) throws InterruptedException, IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到消息: %s, deliveryTag: %d\n", new String(message.getBody()), deliveryTag);
// 模拟处理失败
int num = 3/0;
System.out.println("处理完成");
// 手动确认
channel.basicAck(deliveryTag, true);
}catch (Exception e){
// 第三个参数requeue决定是否重新入队,如果为true,则会重新发送;若为false,则直接丢弃,若此时设置了死信,会进入到死信队列
channel.basicNack(deliveryTag, true,false);
}
}
// 监听死信队列
@RabbitListener(queues = Constants.DL_QUEUE)
public void dlQueue(Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("死信队列接收到消息: %s, deliveryTag: %d\n", new String(message.getBody()), deliveryTag);
}
}
死信就是因为种种原因,而导致的无法被消费的信息。
Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false对于 RabbitMQ 来说,死信队列是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
比如:用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。
为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列中,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。
场景的应用场景还有:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。