首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka -未调用重试和恢复

Kafka -未调用重试和恢复
EN

Stack Overflow用户
提问于 2021-02-13 17:29:09
回答 1查看 57关注 0票数 0

我实现了一个自定义侦听器,即不使用@KafkaListener注释,因为我的应用程序需要动态链接到主题。我已经看到了迁移到Spring kafka 2.6.x的建议,但我不能升级,因为我被Spring 5.1.X.RELEASE卡住了(至少现在是这样),这意味着我只能使用Spring-kafka 2.2.x。

我的问题是,如何使用Spring-kafka 2.2.x实现重试、恢复和错误处理?

代码语言:javascript
复制
ConcurrentKafkaListenerContainerFactory listenerFactory = new ConcurrentKafkaListenerContainerFactory();
listenerFactory.setConsumerFactory(consumerFactory);
configurer.configure(listenerFactory, consumerFactory);
listenerFactory.setConcurrency(listenerConcurrency); 
listenerFactory.setStatefulRetry(Boolean.TRUE);
listenerFactory.setBatchListener(isBatchListener);
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
listenerFactory.getContainerProperties().setAckOnError(false);
listenerFactory.setRetryTemplate(retryTemplate(kafkaEhCacheRetryManager)); 
listenerFactory.setRecoveryCallback(kafkaRecoverer);

我的重试模板如下所示:

代码语言:javascript
复制
RetryTemplate retryTemplate(EhCacheCacheManager kafkaEhCacheRetryManager) {

ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(initialIntervalForRetries);
exponentialBackOffPolicy.setSleeper(new ThreadWaitSleeper());
exponentialBackOffPolicy.setMultiplier(2.0);
exponentialBackOffPolicy.setMaxInterval(maxIntervalForRetries);

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryContextCache(new KafkaEhRetryContextCache(kafkaEhCacheRetryManager));
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

// KafkaTransactionalRetryPolicy extends SimpleRetryPolicy
KafkaTransactionalRetryPolicy retryPolicy = new KafkaTransactionalRetryPolicy(kafkaTemplate);
retryPolicy.setMaxAttempts(maxAttempts);
retryTemplate.setRetryPolicy(kafkaTransactionalRetryPolicy);

return retryTemplate;

}

我的监听器看起来像这样:

代码语言:javascript
复制
public class MyKafkaListener implements MessageListener<String, String> {


    @Override
    @Transactional(value = "chainedKafkaTransactionManager")
    public void onMessage(final ConsumerRecord<String, String> consumerRecord){
       throw new RuntimeException("thrown out of out anger");
    }

}

使用此配置:

代码语言:javascript
复制
spring.kafka:
  bootstrap-servers: ${service.kakfa.host}
  admin:
    client-id: test-consumers
    bootstrap-servers: ${service.kakfa.host}
  consumer:
    bootstrap-servers: ${service.kakfa.host}
    group-id: local-consumers
    client-id: local-consumers
    auto-offset-reset: earliest
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    enable-auto-commit: false
    isolation-level: read_committed
  producer:
    bootstrap-servers: ${service.kakfa.host}
    client-id: local-producer
    acks: all
    retries: 3
    transaction-id-prefix: local-producer-tx-
    properties:
      enable.idempotence: true
      transactional.id: tran-id-1-
      max.in.flight.requests.per.connection: 5
  listener.concurrency: 1

我在StackOverflow上看到了几个关于如何做到这一点的例子,但到目前为止还没有一个有效。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-02-16 23:11:48

您尝试使用的重试机制仅适用于@KafakListener%s。它内置于用于调用监听程序POJO的监听程序适配器中。

在较新的版本中,SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor有一个后退(从2.3开始),不再需要侦听器级别的重试模板,而是支持容器级别的重试。

对于您自己的侦听器,您必须在侦听器代码本身中使用RetryTemplate

顺便说一句,https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions#supported-versions不再支持Spring5.1.x

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66183607

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档