首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在2.5.4版本中发布Spring

如何在2.5.4版本中发布Spring
EN

Stack Overflow用户
提问于 2020-08-06 05:40:03
回答 1查看 1K关注 0票数 0

需要你的帮助和指导。

我在当前的项目中使用了2.2.X版本spring。

我创建的错误处理如下所示:

代码语言:javascript
复制
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
    return factory;
}

public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}

然后,我将我所有的项目依赖版本(如spring和spring)升级到最新版本:2.5.4发行版

我发现有些方法被废弃并改变了。

SeekToCurrentErrorHandler

代码语言:javascript
复制
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
    // recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));

我的问题是,如何使用以下配置生成DLQ:

编辑的

代码语言:javascript
复制
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(consumerConcurrencyCount);
    factory.setErrorHandler(errorHandler());
    return factory;
}

public SeekToCurrentErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler(
            deadLetterPublishingRecoverer(),
            new FixedBackOff(0L, 2L)
    );
}

public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(
            getEventKafkaTemplate(),
            (record, ex) -> {
                if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
                    return new TopicPartition("topic-undelivered", -1);
                }

                return new TopicPartition("topic-fail", -1);
            });
}

public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}

这种配置工作,感谢加里!

提前感谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-06 14:24:33

不清楚你说的是什么意思

问题是,在文档中,它仍然使用旧的方法,这个方法对于2.5.X版本来说是不可取的

KafkaOperationsKafkaTemplate实现的接口;您需要做的唯一更改就是将maxAttempts更改为BackOff.

代码语言:javascript
复制
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L));
    return factory;
}

public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63277494

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档