需要你的帮助和指导。
我在当前的项目中使用了2.2.X版本spring。
我创建的错误处理如下所示:
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}然后,我将我所有的项目依赖版本(如spring和spring)升级到最新版本:2.5.4发行版。
我发现有些方法被废弃并改变了。
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));我的问题是,如何使用以下配置生成DLQ:
编辑的
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(consumerConcurrencyCount);
factory.setErrorHandler(errorHandler());
return factory;
}
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler(
deadLetterPublishingRecoverer(),
new FixedBackOff(0L, 2L)
);
}
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(
getEventKafkaTemplate(),
(record, ex) -> {
if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
return new TopicPartition("topic-undelivered", -1);
}
return new TopicPartition("topic-fail", -1);
});
}
public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}这种配置工作,感谢加里!
提前感谢
发布于 2020-08-06 14:24:33
不清楚你说的是什么意思
问题是,在文档中,它仍然使用旧的方法,这个方法对于2.5.X版本来说是不可取的
KafkaOperations是KafkaTemplate实现的接口;您需要做的唯一更改就是将maxAttempts更改为BackOff.
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}https://stackoverflow.com/questions/63277494
复制相似问题