首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring Kafka中的事务同步

Spring Kafka中的事务同步
EN

Stack Overflow用户
提问于 2017-11-17 23:53:57
回答 4查看 16.3K关注 0票数 13

我想将kafka事务与存储库事务同步:

代码语言:javascript
复制
@Transactional
public void syncTransaction(){
  myRepository.save(someObject)
  kafkaTemplate.send(someEvent)
}

由于合并(https://github.com/spring-projects/spring-kafka/issues/373),并且根据文档,这是可能的。然而,我在理解和实现该功能时遇到了问题。查看https://docs.spring.io/spring-kafka/reference/html/#transaction-synchronization中的示例,我必须创建一个MessageListenerContainer来侦听我自己的事件。我仍然需要使用KafkaTemplate发送我的事件吗?MessageListenerContainer是否禁止向代理发送消息?

如果我理解正确,kafkaTemplate和kafkaTransactionManager必须使用相同的producerFactory,其中我必须启用事务设置transactionIdPrefix。在我的示例中,我必须将messageListenerContainer的TransactionManager设置为DataSourceTransactionManager。对吗?

从我的角度来看,我通过kafkaTemplate发送事件,监听我自己的事件,然后再次使用kafkaTemplate转发事件,这看起来很奇怪。

如果我能得到一个简单的kafka事务与存储库事务同步的示例和解释,我会真的很有帮助。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-11-18 01:04:35

如果监听器容器提供了KafkaTransactionManager,容器会创建一个生产者,供任何下游kafka模板使用,容器会将偏移量发送给事务。

如果容器有其他事务管理器,容器就不能发送偏移量,因为它不能访问生产者(或模板)。

另一种解决方案是使用@Transactional注释您的方法(使用数据源TM),并使用kafka配置容器。

这样,您的DB tx将在线程返回到容器之前提交,容器然后将偏移量发送到kafka事务并提交它。

有关示例,请参阅the framework test cases

票数 10
EN

Stack Overflow用户

发布于 2018-01-05 22:46:05

@Eike Behrends有一个db + kafka事务,你可以使用ChainedTransactionManager并这样定义它:

代码语言:javascript
复制
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
    KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());;
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}


@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
    return new JpaTransactionManager(em);
}

@Bean(name = "chainedTransactionManager")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                           KafkaTransactionManager kafkaTransactionManager) {
    return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}

您需要注释事务性db+kafka方法@Transactional("chainedTransactionManager")

(你可以在spring-kafka项目上看到这个问题:https://github.com/spring-projects/spring-kafka/issues/433 )

你可以说:

从我的角度看,我通过kafkaTemplate发送事件,监听我自己的事件,然后再次使用kafkaTemplate转发事件,这看起来很奇怪。

你试过这个吗?如果是这样的话,你能提供一个例子吗?

票数 7
EN

Stack Overflow用户

发布于 2021-03-09 19:25:36

为了实现您的目标,您应该使用不同的“最终一致”方法,如CDC (变更数据捕获)。在Kafka写入和任何其他系统(例如数据库)之间没有原子事务,也就是XA事务。当你拥有分布式服务(有些人称之为微服务)时,这是一个完整的范式快速,在你的情况下,这些服务可能通过产生/消费Kafka主题进行通信。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47354521

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档