首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >事务性kafka侦听器重试

事务性kafka侦听器重试
EN

Stack Overflow用户
提问于 2022-01-19 23:24:54
回答 1查看 1.3K关注 0票数 0

我正在尝试创建一个Spring @KafkaListener,它既是事务性的(kafa和数据库),也是使用重试的。我用的是春靴。错误处理程序的文档显示

在使用事务时,默认情况下不配置错误处理程序,因此异常将回滚事务。事务容器的错误处理由AfterRollbackProcessor处理。如果在使用事务时提供自定义错误处理程序,则如果希望回滚事务(来源),则必须抛出异常。

但是,当我用@Transactional("kafkaTransactionManager(注释)配置侦听器时,即使我可以清楚地看到,当引发异常时模板回滚生成的消息,容器实际上使用的是非空commonErrorHandler而不是AfterRollbackProcessor。即使在容器工厂中显式将commonErrorHandler配置为null时也是如此。即使在AfterRollbackProcessor耗尽其重试策略之后,我也没有看到任何证据表明已配置的commonErrorHandler被调用过。

我不确定Spring的错误处理在这一点上一般是如何工作的,并希望得到澄清。我想回答的问题是:

  • 使用Spring-Kafka2.8.0配置事务性kafka侦听器的推荐方法是什么?我做得对吗?
  • 是否应该使用公共错误处理程序,而不是回滚后处理器?在根据重试策略再次处理消息之前,它是否回滚当前事务?
  • 一般来说,当我有一个事务性的kafka侦听器时,是否有不止一层的错误处理是我应该知道的?例如,如果我的常见错误处理程序重新抛出类型T的异常,另一个处理程序会捕获它并可能开始自己的重试吗?

谢谢!

我的代码:

代码语言:javascript
复制
@Configuration
@EnableScheduling
@EnableKafka
public class KafkaConfiguration {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfiguration.class);

  @Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
      ConsumerFactory<Object, Object> consumerFactory) {
    var factory = new ConcurrentKafkaListenerContainerFactory<Integer, Object>();
    factory.setConsumerFactory(consumerFactory);

    var afterRollbackProcessor =
        new DefaultAfterRollbackProcessor<Object, Object>(
            (record, e) -> LOGGER.info("After rollback processor triggered! {}", e.getMessage()),
            new FixedBackOff(1_000, 1));

    // Configures different error handling for different listeners.
    factory.setContainerCustomizer(
        container -> {
          var groupId = container.getContainerProperties().getGroupId();
          if (groupId.equals("InputProcessorHigh") || groupId.equals("InputProcessorLow")) {
            container.setAfterRollbackProcessor(afterRollbackProcessor);
            // If I set commonErrorHandler to null, it is defaulted instead.
          }
        });
    return factory;
  }
}
代码语言:javascript
复制
@Component
public class InputProcessor {

  private static final Logger LOGGER = LoggerFactory.getLogger(InputProcessor.class);

  private final KafkaTemplate<Integer, Object> template;
  private final AuditLogRepository repository;

  @Autowired
  public InputProcessor(KafkaTemplate<Integer, Object> template, AuditLogRepository repository) {
    this.template = template;
    this.repository = repository;
  }

  @KafkaListener(id = "InputProcessorHigh", topics = "input-high", concurrency = "3")
  @Transactional("kafkaTransactionManager")
  public void inputHighProcessor(ConsumerRecord<Integer, Input> input) {
    processInputs(input);
  }

  @KafkaListener(id = "InputProcessorLow", topics = "input-low", concurrency = "1")
  @Transactional("kafkaTransactionManager")
  public void inputLowProcessor(ConsumerRecord<Integer, Input> input) {
    processInputs(input);
  }

  public void processInputs(ConsumerRecord<Integer, Input> input) {
    var key = input.key();
    var message = input.value().getMessage();
    var output = new Output().setMessage(message);

    LOGGER.info("Processing {}", message);
    template.send("output-left", key, output);
    repository.createIfNotExists(message); // idempotent insert
    template.send("output-right", key, output);

    if (message.contains("ERROR")) {
      throw new RuntimeException("Simulated processing error!");
    }
  }
}

我的application.yaml (减去引导服务器和安全配置):

代码语言:javascript
复制
spring:
  kafka:
    consumer:
      auto-offset-reset: 'earliest'
      key-deserializer: 'org.apache.kafka.common.serialization.IntegerDeserializer'
      value-deserializer: 'org.springframework.kafka.support.serializer.JsonDeserializer'
      isolation-level: 'read_committed'
      properties:
        spring.json.trusted.packages: 'java.util,java.lang,com.github.tomboyo.silverbroccoli.*'
    producer:
      transaction-id-prefix: 'tx-'
      key-serializer: 'org.apache.kafka.common.serialization.IntegerSerializer'
      value-serializer: 'org.springframework.kafka.support.serializer.JsonSerializer'

编辑

在加里的帮助下,我想出了办法。正如他们所说的,我们需要在容器上设置kafka事务管理器,以便容器能够启动事务。事务文档没有介绍如何做到这一点,有几种方法。首先,我们可以从工厂获取可变容器属性对象,并在该对象上设置事务管理器。

代码语言:javascript
复制
@Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setTransactionManager(...);
    return factory;
}

如果我们处于Spring,我们可以重新使用一些自动配置来在我们的工厂中设置合理的默认值,然后再定制它。我们可以看到,KafkaAutoConfiguration模块导入了生成KafkaAnnotationDrivenConfiguration bean的KafkaAnnotationDrivenConfiguration。这似乎对Spring应用程序中的所有默认配置负责。因此,我们可以注入该bean,并在添加自定义之前使用它初始化工厂:

代码语言:javascript
复制
@Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer bootConfigurer,
      ConsumerFactory<Object, Object> consumerFactory) {
    var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();

    // Apply default spring-boot configuration.
    bootConfigurer.configure(factory, consumerFactory);

    factory.setContainerCustomizer(
        container -> {
          ... // do whatever
        });
    return factory;
  }

一旦完成,容器将使用AfterRollbackProcessor进行错误处理,就像预期的那样。只要我没有显式配置常见的错误处理程序,这似乎是唯一的异常处理层。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-01-20 14:17:17

只有当容器知道事务时才使用AfterRollbackProcessor;您必须向容器提供一个KafkaTransactionManager,以便容器启动卡夫卡事务,并将偏移发送到事务。使用@Transactional不是启动Kafka事务的正确方式。

请参阅https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70779043

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档