我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,kafka监听器消费来自main topic、Retry Topic和DLT topic的消息,我们只希望监听器消费来自main和retry topic的消息。
有没有一种简单的设置方法?
因为我们不希望同一个消费者处理DLT消息。DLT也会被另一个进程使用,以发送请求的通知。
// our configuration
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {
List<Class<? extends Throwable>> throwableList = Arrays.asList(IllegalArgumentException.class,
IllegalAccessException.class);
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod(XYZ.class, "xyz")
.exponentialBackoff(delayMs, backoffMultiplier, maxIntervalInMs)
.maxAttempts(retryAttempt)
.notRetryOn(throwableList)
.doNotAutoCreateRetryTopics()
.listenerFactory(kafkaListenerContainerFactory())
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.create(template);
}发布于 2021-10-18 19:21:21
只需删除.dltHandlerMethod(XYZ.class, "xyz")即可。
编辑
这仍将创建一个默认的DLT处理程序,该处理程序将只记录记录。
您可以在“其他”消费者中使用不同的组,也可以手动启动容器( DLT容器除外)。
在@KafkaListener上设置autoStartup="false";然后添加以下内容...
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> {
registry.getListenerContainerIds().forEach(id -> {
if (!id.endsWith("-dlt")) {
registry.getListenerContainer(id).start();
}
});
};
}https://stackoverflow.com/questions/69571401
复制相似问题