我使用spring的SeekToCurrentErrorHandler和DeadLetterPublishingRecoverer。
每次调用错误处理程序时都会记录一条消息,并将一个事件发送到我们的监视系统。
我看到的问题是,对于连续记录,错误处理程序被调用了太多次,导致了故障。
例如,对于不可还原的异常,如果生成2(错误) msgs,则会得到以下日志:
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication对于x使用者错误消息,我似乎得到了:对错误处理程序的x调用,对错误处理程序的x-1调用,对x-2调用,等等。
对于可检索的异常也是如此,我在每次重试时都看到了相同的情况。使用者函数被正确地调用,只有错误处理程序被多次触发。
这是我的错误处理配置:
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private final Monitor monitor;
CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
super(dlpr, retries);
super.setLogLevel(KafkaException.Level.DEBUG);
this.monitor = monitor;
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
if (!records.isEmpty()) {
records.forEach(record -> {
logAndReportError(record);
});
}
super.handle(exception, records, consumer, container);
}
}
@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
seekToCurrent.addNotRetryableException(SomeFatalException.class);
return seekToCurrent;
}我有两个问题:
发布于 2020-10-22 16:28:10
您的问题不清楚,请添加您的代码和configuration.
https://stackoverflow.com/questions/64484191
复制相似问题