首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >延迟处理死信队列

延迟处理死信队列
EN

Stack Overflow用户
提问于 2019-12-16 19:32:59
回答 1查看 1.6K关注 0票数 0

我想做以下工作:当一条消息失败并落入我的死信队列时,我想等待5分钟,并在我的队列中重新发布相同的消息。

今天,我使用Streams和RabbitMQ完成了以下代码基于这些文档

代码语言:javascript
复制
@Component
public class HandlerDlq {

    private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
    private static final String X_RETRIES_HEADER = "x-retries";
    private static final String X_DELAY_HEADER = "x-delay";
    private static final int NUMBER_OF_RETRIES = 3;
    private static final int DELAY_MS = 300000;
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public HandlerDlq(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = MessageInputProcessor.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer  retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = 0;
        }
        if (retriesHeader > NUMBER_OF_RETRIES) {
            LOGGER.warn("Message {} added to failed messages queue", failedMessage);
            this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
            throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
        }
        retriesHeader++;
        headers.put(X_RETRIES_HEADER, retriesHeader);
        headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
        LOGGER.warn("Retrying message, {} attempts", retriesHeader);
        this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(MessageInputProcessor.FAILED);
    }
}

我的MessageInputProcessor接口:

代码语言:javascript
复制
public interface MessageInputProcessor {

    String INPUT = "myInput";

    String INPUT_DESTINATION = "myInput.group";

    String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file

    String FAILED = INPUT + "-failed";

    String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";

    @Input
    SubscribableChannel storageManagerInput();

    @Input(MessageInputProcessor.FAILED)
    SubscribableChannel storageManagerFailed();
}

我的财产档案:

代码语言:javascript
复制
#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000
spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true


#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group

使用这段代码,我可以从死信队列中读取,捕获标题,但我不能将它放回队列中( LOGGER.warn("Retrying message, {} attempts", retriesHeader);行只运行一次,即使时间很慢)。

我的猜测是,bindOriginalToDelay方法将交换绑定到一个新队列,而不是我的。但是,我没有找到让队列绑定到那里的方法,而不是创建一个新的队列。但我甚至不确定这是不是个错误。

我也尝试过发送到MessageInputProcessor.INPUT,而不是MessageInputProcessor.INPUT_DESTINATION,但是它没有像预期的那样工作。

而且,不幸的是,由于对项目的依赖关系,我无法更新Spring框架.

过了一段时间,你能帮我把失败的消息放回我的队列吗?我真的不想把thread.sleep放在那里.

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-12-16 21:18:04

使用该配置,myInput.group被绑定到延迟(主题)交换myInput和路由密钥#

您可能应该删除spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true,因为您不需要延迟主交换。

它还将绑定到您的显式延迟交换,使用键myInput.group

在我看来,一切都是正确的;您应该看到绑定到两个交换的相同(单个)队列:

myInput.group.dlq用键myInput.group绑定到DLX

您应该设置一个较长的TTL,并检查DLQ中的消息,以确定是否有什么突出之处。

编辑

我只是以5秒的延迟复制了您的代码,它对我来说很好(在主交换机上关闭延迟)。

代码语言:javascript
复制
Retrying message, 4 attempts

代码语言:javascript
复制
added to failed messages queue

也许你认为它不起作用,因为你在主交易所也有一个延迟?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59363167

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档