在将spring boot从2.1.11版本升级到2.2.5之后,kafka客户端将在jpa事务提交之前向代理生成消息。在没有使用链式kafka事务管理器的情况下,事务运行良好。在2.1.x和2.2.x之间的事务中是否引入了向后兼容性更改?有人能提供跨JPA和Kafka的事务管理器吗?
对于JPA,我只使用了以下事务管理器
@Bean
@Primary
public PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {
final JpaTransactionManager txManager = new JpaTransactionManager();
txManager.setEntityManagerFactory(emf);
return txManager;
}我对kafka事务使用了以下属性:
spring.cloud.stream.kafka.default.consumer.configuration.isolation.level:事务spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: xyz-0 spring.kafka.producer.Transaction-id- read_committed :xyz 0
发布于 2020-03-23 13:16:08
我们可以使用jpa和kafka创建链式事务。我们可以使用BinderFactory来获取kafka绑定器并创建绑定器kafka事务管理器。
@Bean(name = "chainedTransactionManager")
@Primary
public PlatformTransactionManager chainedTransactionManager(JpaTransactionManager jpaTM,
BinderFactory binders) {
Binder<MessageChannel,?,?> binder = binders.getBinder("kafka", MessageChannel.class);
if (binder instanceof KafkaMessageChannelBinder) {
ProducerFactory<byte[], byte[]> pf =
((KafkaMessageChannelBinder) binder).getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(
AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<Object, Object>(jpaTM, ktm);
} else {
return jpaTM;
}
}```https://stackoverflow.com/questions/60717170
复制相似问题