首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法同步Kafka和MQ事务usingChainedKafkaTransaction

无法同步Kafka和MQ事务usingChainedKafkaTransaction
EN

Stack Overflow用户
提问于 2020-05-21 02:43:03
回答 1查看 425关注 0票数 0

我们有一个spring引导应用程序,它使用来自IBM的消息,进行一些转换,并将结果发布到Kafka主题。为此,我们使用https://spring.io/projects/spring-kafka。我知道Kafka不支持XA;但是,在文档中,我发现了一些关于使用ChainedKafkaTransactionManager链接多个事务管理器和同步事务的输入。同一文档还提供了一个示例,说明如何同步Kafka和数据库,同时读取Kafka的消息并将它们存储在数据库中。

在我的示例中,我遵循同样的示例,并将JmsTransactionManagerKafkaTransactionManager链接到ChainedKafkaTransactionManager的伞下。bean定义如下:

代码语言:javascript
复制
@Bean({"mqListenerContainerFactory"})
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.connectionFactory());
    factory.setTransactionManager(this.jmsTransactionManager());
    return factory;
}

@Bean
public JmsTransactionManager jmsTransactionManager() {
    return new JmsTransactionManager(this.connectionFactory());
}

@Bean("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(
        JmsTransactionManager jmsTransactionManager, KafkaTransactionManager kafkaTransactionManager) {

    return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, jmsTransactionManager);
}

@Transactional(transactionManager = "chainedKafkaTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "${myApp.sourceQueue}", containerFactory = "mqListenerContainerFactory")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
    // Processing the message here then publishing it to Kafka using KafkaTemplate
    kafkaTemplate.send(sourceTopic,transformedMessage);

    // Then throw an exception just to test the transaction behaviour
    throw new RuntimeException("Not good Pal!");
}

在运行应用程序时,所发生的情况是消息一直被回滚到MQ队列中,但是消息在Kafka主题中不断增加,这意味着kafkaTemplate交互不会得到回滚。

如果我能很好地理解文档,那么情况就不应该是这样了。“如果事务是活动的,则在事务范围内执行的任何KafkaTemplate操作都使用事务的生产者。”

在我们的application.yaml中,我们将卡夫卡生产者配置为通过设置spring.kafka.producer.transaction-id-prefix来使用事务

问题是我在这里错过了什么,我应该如何解决它。预先感谢您的投入。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-21 13:30:24

默认情况下,使用者可以看到未提交的记录;将isolation.level使用者属性设置为read_committed,以避免接收回滚事务的记录。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61926306

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档