我正在使用Spring与RabbitMQ一起创建应用程序。我为兔子创建了如下配置:
@Configuration
public class RabbitConfiguration {
public static final String RESEND_DISPOSAL_QUEUE = "RESEND_DISPOSAL";
@Bean
public Queue resendDisposalQueue() {
return new Queue(RESEND_DISPOSAL_QUEUE, true);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory (ConnectionFactory connectionFactoryr) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
}此外,我还为兔子消息创建了如下监听器:
@RabbitListener(queues = RESEND_DISPOSAL_QUEUE)
public void getResendDisposalPayload(String messageBody){
LOGGER.info("[getResendDisposalPayload] message = {}", messageBody);
// And there is some business logic
}一切都很好,但有一个问题。当我在方法getResendDisposalPayload中得到异常时,它侦听RESEND_DISPOSAL_QUEUE队列(例如,数据库中的临时问题),兔子立即开始重新发送最后一个未处理的消息。它产生了大量的日志,出于某种原因,我的系统不舒服。
正如我在本文中所读到的,https://www.baeldung.com/spring-amqp-exponential-backoff“同时使用死信队列是处理失败消息的标准方法”。为了使用这种模式,我创建了RetryOperationsInterceptor,它定义了计数、尝试传递消息和尝试之间的延迟。例如:
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000)
.maxAttempts(3)
.recoverer(messageRecoverer)
.build();
}这听起来很不错,但只有一个问题:我不能在options maxAttempts中定义无穷大尝试量。在maxAttempts之后,我必须保存一些坏消息,并在将来处理它。它需要一些额外的代码。
问题是:是否有任何方法可以配置兔子,使其以某种延迟的方式来无限地重发中断的消息,比如一秒钟的延迟?
发布于 2021-10-20 15:44:18
兔子开始无延迟地重新发送最后未处理的消息
这就是重传的工作原理:它一次又一次地重新推送相同的消息,直到您手动地对其进行压缩或完全删除。在重发之间不会有任何延迟,仅仅是因为没有从队列中提取新的消息,直到对这个消息做了一些事情。
我无法在选项
maxAttempts中定义无穷大尝试量
你试过Integer.MAX_VALUE吗?相当多的尝试。
另一种方法是使用延迟的Exchange:https://docs.spring.io/spring-amqp/docs/current/reference/html/#delayed-message-exchange。
您可以使用RepublishMessageRecoverer配置重试,以便在尝试结束后将其发布回原始队列:https://docs.spring.io/spring-amqp/docs/current/reference/html/#async-listeners。
https://stackoverflow.com/questions/69594640
复制相似问题