首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring Cloud Stream批量下发DLQ失败

Spring Cloud Stream批量下发DLQ失败
EN

Stack Overflow用户
提问于 2021-08-03 10:24:25
回答 1查看 419关注 0票数 1

正在尝试将Spring配置为在使用批处理模式时将错误消息发送到死信队列。但结果是在dlq主题中什么都没有。

我使用Spring Boot 2.5.3和Spring Cloud 2020.0.3。这会自动将spring-cloud-stream-binder-kafka-parent版本解析为3.1.3。

下面是application.properties:

代码语言:javascript
复制
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.bindings.input-in-0.destination=topic4
spring.cloud.stream.bindings.input-in-0.group=batch4
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=5 

下面是函数式编程模型中的应用程序和批处理侦听器:

代码语言:javascript
复制
@SpringBootApplication
public class DemoKafkaBatchErrorsApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoKafkaBatchErrorsApplication.class, args);
    }

    @Bean
    public Consumer<List<byte[]>> input() {
        return messages -> {

            for (int i = 0; i < messages.size(); i++) {

                throw new BatchListenerFailedException("Demo: failed to process = ", i);
            }
        };
    }

    @Bean
    public RecoveringBatchErrorHandler batchErrorHandler(KafkaTemplate<String, byte[]> template) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
        return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 10));
    }
}

发送至主题:

代码语言:javascript
复制
./kafka-console-producer.sh --broker-list broker:9092 --topic topic4 < input.json

从DLQ读取:

代码语言:javascript
复制
./kafka-console-consumer.sh --bootstrap-server broker:9092 --topic topic4_ERR --from-beginning --max-messages 100

所以在运行这个应用程序后,我在dlq主题中什么也没有得到,但在控制台中有很多消息,比如:

代码语言:javascript
复制
Caused by: org.springframework.kafka.listener.BatchListenerFailedException: Demo: failed to process =  @-0
    at com.example.demokafkabatcherrors.DemoKafkaBatchErrorsApplication.lambda$input$0(DemoKafkaBatchErrorsApplication.java:29) ~[classes/:na]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:854) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:643) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:489) ~[spring-cloud-function-context-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:77) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:727) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:560) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.2.jar:5.5.2]
    ... 27 common frames omitted

我做错了什么?

UPD:根据Gary的回答,我做了以下更改:

代码语言:javascript
复制
    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BatchErrorHandler handler) {
        return ((container, destinationName, group) -> container.setBatchErrorHandler(handler));
    }

    @Bean
    public BatchErrorHandler batchErrorHandler(KafkaOperations<String, byte[]> kafkaOperations) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaOperations,
                (cr, e) -> new TopicPartition(cr.topic() + "_ERR", 0));
        return new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 3));
    }

一切都像符咒一样工作着

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-09 15:35:54

在使用spring-cloud-stream时,容器不是由Boot的容器工厂创建的,而是由绑定器创建的;错误处理程序@Bean不会自动连接进来。

您必须改为配置ListenerContainerCustomizer @Bean

示例如下:Can I apply graceful shutdown when using Spring Cloud Stream Kafka 3.0.3.RELEASE?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68634379

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档