首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >处理死信队列的配置

处理死信队列的配置
EN

Stack Overflow用户
提问于 2019-12-09 18:52:15
回答 1查看 2K关注 0票数 0

我有一个使用RabbitMQ在微服务中交换消息的项目。对我的项目至关重要的一件事是,我绝不能失去任何信息。

为了尽量减少失败,我计划如下:

  • 对队列中的消息使用默认的重试方法
  • 配置死信队列,以便在一段时间后再将消息放到队列中。
  • 为了避免无限循环,只允许几次(比方说,5)消息可以从死信队列重新发布到常规消息队列。

我相信我可以使用下面的配置实现前两项:

代码语言:javascript
复制
#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=300000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=

#input
spring.cloud.stream.bindings.myInput.destination=my-queue
spring.cloud.stream.bindings.myInput.group=my-group

但是,我找不到在这个参考指南上搜索如何做我想做的事情(大多数情况下,如何从死信队列中配置最大数量的重新发布)。我不能完全确定我是否在正确的路径上--也许我应该手动创建第二个队列并编写我想要的代码,并且只将死信留给完全失败的消息(我必须定期检查并手动处理这些消息,因为我的系统不应该丢失任何消息).

我对这些框架很陌生,我希望你能帮我配置我的.

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-12-09 19:18:00

这是关于兔子粘合剂的文档展示了如何在一些重试失败后将死信发布到停车场队列中。

代码语言:javascript
复制
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

第二个例子展示了如何使用延迟的exchange插件来延迟重试。

代码语言:javascript
复制
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59254958

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档