我无法让spring-cloud-stream-binder-kafka在以下用例中工作:
Start @transaction (Rest控制器) DB更新/插入发送Kafka消息
在事务提交之前,消费者(使用@EnableBinding和@StreamListener配置)能够读取记录。此使用者已配置了read_committed隔离级别。
我不确定这是一个问题还是我的任何配置。
尝试配置bean ChainedTransactionManager,但遇到了一些其他问题。
发布于 2020-03-23 13:11:34
下面的方法对我很有效
@Bean(name = "chainedTransactionManager")
@Primary
public PlatformTransactionManager chainedTransactionManager(JpaTransactionManager jpaTM,
BinderFactory binders) {
Binder<MessageChannel,?,?> binder = binders.getBinder("kafka", MessageChannel.class);
if (binder instanceof KafkaMessageChannelBinder) {
ProducerFactory<byte[], byte[]> pf =
((KafkaMessageChannelBinder) binder).getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(
AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<Object, Object>(jpaTM, ktm);
} else {
return jpaTM;
}
}欲了解更多详情,请访问https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/729
https://stackoverflow.com/questions/60738761
复制相似问题