首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spring-kafka错误处理程序被多次调用。

spring-kafka错误处理程序被多次调用。
EN

Stack Overflow用户
提问于 2020-10-22 14:15:19
回答 1查看 259关注 0票数 0

我使用spring的SeekToCurrentErrorHandlerDeadLetterPublishingRecoverer

每次调用错误处理程序时都会记录一条消息,并将一个事件发送到我们的监视系统。

我看到的问题是,对于连续记录,错误处理程序被调用了太多次,导致了故障。

例如,对于不可还原的异常,如果生成2(错误) msgs,则会得到以下日志:

代码语言:javascript
复制
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42  [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42  [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication

对于x使用者错误消息,我似乎得到了:对错误处理程序的x调用,对错误处理程序的x-1调用,对x-2调用,等等。

对于可检索的异常也是如此,我在每次重试时都看到了相同的情况。使用者函数被正确地调用,只有错误处理程序被多次触发。

这是我的错误处理配置:

代码语言:javascript
复制
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
    private final Monitor monitor;

    CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
        super(dlpr, retries);
        super.setLogLevel(KafkaException.Level.DEBUG);
        this.monitor = monitor;
    }

    @Override
    public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        if (!records.isEmpty()) {
            records.forEach(record -> {
                logAndReportError(record);
            });
        }
        super.handle(exception, records, consumer, container);
    }
}

@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
    var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
   seekToCurrent.addNotRetryableException(SomeFatalException.class);
   return seekToCurrent;
}

我有两个问题:

  1. 为什么会多次触发错误处理程序?
  2. 为什么要为不可检索的异常执行
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-10-22 16:28:10

您的问题不清楚,请添加您的代码和configuration.

  1. 我们必须在失败的记录之后寻找记录,以便在下一次投票中重新发布。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64484191

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档