我正在尝试创建一个Spring @KafkaListener,它既是事务性的(kafa和数据库),也是使用重试的。我用的是春靴。错误处理程序的文档显示
在使用事务时,默认情况下不配置错误处理程序,因此异常将回滚事务。事务容器的错误处理由AfterRollbackProcessor处理。如果在使用事务时提供自定义错误处理程序,则如果希望回滚事务(来源),则必须抛出异常。
但是,当我用@Transactional("kafkaTransactionManager(注释)配置侦听器时,即使我可以清楚地看到,当引发异常时模板回滚生成的消息,容器实际上使用的是非空commonErrorHandler而不是AfterRollbackProcessor。即使在容器工厂中显式将commonErrorHandler配置为null时也是如此。即使在AfterRollbackProcessor耗尽其重试策略之后,我也没有看到任何证据表明已配置的commonErrorHandler被调用过。
我不确定Spring的错误处理在这一点上一般是如何工作的,并希望得到澄清。我想回答的问题是:
谢谢!
我的代码:
@Configuration
@EnableScheduling
@EnableKafka
public class KafkaConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfiguration.class);
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<Integer, Object>();
factory.setConsumerFactory(consumerFactory);
var afterRollbackProcessor =
new DefaultAfterRollbackProcessor<Object, Object>(
(record, e) -> LOGGER.info("After rollback processor triggered! {}", e.getMessage()),
new FixedBackOff(1_000, 1));
// Configures different error handling for different listeners.
factory.setContainerCustomizer(
container -> {
var groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("InputProcessorHigh") || groupId.equals("InputProcessorLow")) {
container.setAfterRollbackProcessor(afterRollbackProcessor);
// If I set commonErrorHandler to null, it is defaulted instead.
}
});
return factory;
}
}@Component
public class InputProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(InputProcessor.class);
private final KafkaTemplate<Integer, Object> template;
private final AuditLogRepository repository;
@Autowired
public InputProcessor(KafkaTemplate<Integer, Object> template, AuditLogRepository repository) {
this.template = template;
this.repository = repository;
}
@KafkaListener(id = "InputProcessorHigh", topics = "input-high", concurrency = "3")
@Transactional("kafkaTransactionManager")
public void inputHighProcessor(ConsumerRecord<Integer, Input> input) {
processInputs(input);
}
@KafkaListener(id = "InputProcessorLow", topics = "input-low", concurrency = "1")
@Transactional("kafkaTransactionManager")
public void inputLowProcessor(ConsumerRecord<Integer, Input> input) {
processInputs(input);
}
public void processInputs(ConsumerRecord<Integer, Input> input) {
var key = input.key();
var message = input.value().getMessage();
var output = new Output().setMessage(message);
LOGGER.info("Processing {}", message);
template.send("output-left", key, output);
repository.createIfNotExists(message); // idempotent insert
template.send("output-right", key, output);
if (message.contains("ERROR")) {
throw new RuntimeException("Simulated processing error!");
}
}
}我的application.yaml (减去引导服务器和安全配置):
spring:
kafka:
consumer:
auto-offset-reset: 'earliest'
key-deserializer: 'org.apache.kafka.common.serialization.IntegerDeserializer'
value-deserializer: 'org.springframework.kafka.support.serializer.JsonDeserializer'
isolation-level: 'read_committed'
properties:
spring.json.trusted.packages: 'java.util,java.lang,com.github.tomboyo.silverbroccoli.*'
producer:
transaction-id-prefix: 'tx-'
key-serializer: 'org.apache.kafka.common.serialization.IntegerSerializer'
value-serializer: 'org.springframework.kafka.support.serializer.JsonSerializer'编辑
在加里的帮助下,我想出了办法。正如他们所说的,我们需要在容器上设置kafka事务管理器,以便容器能够启动事务。事务文档没有介绍如何做到这一点,有几种方法。首先,我们可以从工厂获取可变容器属性对象,并在该对象上设置事务管理器。
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
var factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setTransactionManager(...);
return factory;
}如果我们处于Spring,我们可以重新使用一些自动配置来在我们的工厂中设置合理的默认值,然后再定制它。我们可以看到,KafkaAutoConfiguration模块导入了生成KafkaAnnotationDrivenConfiguration bean的KafkaAnnotationDrivenConfiguration。这似乎对Spring应用程序中的所有默认配置负责。因此,我们可以注入该bean,并在添加自定义之前使用它初始化工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer bootConfigurer,
ConsumerFactory<Object, Object> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
// Apply default spring-boot configuration.
bootConfigurer.configure(factory, consumerFactory);
factory.setContainerCustomizer(
container -> {
... // do whatever
});
return factory;
}一旦完成,容器将使用AfterRollbackProcessor进行错误处理,就像预期的那样。只要我没有显式配置常见的错误处理程序,这似乎是唯一的异常处理层。
发布于 2022-01-20 14:17:17
只有当容器知道事务时才使用AfterRollbackProcessor;您必须向容器提供一个KafkaTransactionManager,以便容器启动卡夫卡事务,并将偏移发送到事务。使用@Transactional不是启动Kafka事务的正确方式。
请参阅https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions
https://stackoverflow.com/questions/70779043
复制相似问题