首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka Binders for Spring Cloud不使用事务性回滚

Kafka Binders for Spring Cloud不使用事务性回滚
EN

Stack Overflow用户
提问于 2021-07-09 00:55:10
回答 2查看 68关注 0票数 1

我正在尝试通过Spring Cloud在一个事务中向Kafka中的两个单独的主题发送消息。当在第一条和第二条消息之间抛出异常时,第一条消息仍然出现在第一个主题的使用者中,表明消息没有回滚。下面是我的代码:

代码语言:javascript
复制
@Configuration
@EnableTransactionManagement
public class KafkaChannelTester implements CommandLineRunner {

    ChannelHolder channelHolder;
    MessageChannel messageChannel1;
    MessageChannel messageChannel2;

    public KafkaChannelTester(ChannelHolder channelHolder) {
        this.channelHolder = channelHolder;
        this.messageChannel1 = channelHolder.messageChannel1();
        this.messageChannel2 = channelHolder.messageChannel2();
    }

    @Override
    public void run(String... args) throws Exception {
        transactionFail();
    }


    public void throwException(){ throw new RuntimeException();}

    @Transactional
    public void transactionFail(){
        Message<String> message1 = MessageBuilder
            .withPayload("Test-transaction-fail-"+ LocalDateTime.now())
            .build();
        Message<String> message2 = MessageBuilder
            .withPayload("Test-transaction-fail-"+ LocalDateTime.now())
            .build();
        messageChannel1.send(message1);
        throwException();
        messageChannel2.send(message2);
    }
    @Bean
    public PlatformTransactionManager transactionManager(BinderFactory binders) {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
        System.out.println(pf.transactionCapable());
        System.out.println(pf.getTransactionIdPrefix());
        KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
        return tm;
    }
}

application.yml包含以下内容:

代码语言:javascript
复制
spring:
  cloud:
    stream:
      bindings:
        cloud-producer-1:
          destination:
            peter.cloud.test.1
        cloud-producer-2:
          destination:
            peter.cloud.test.2
      kafka:
        binder:
          brokers:
            - testkbroker:9092
          transaction:
            transaction-id-prefix: transaction-1-
            producer:
              configuration:
                enable.idempotence: true
                retries: 1
                acks: all

transactionManager中的打印语句确认生产者工厂确实具有事务id前缀,并且具有事务处理能力。我可以做些什么来使事务正常工作?

EN

回答 2

Stack Overflow用户

发布于 2021-07-09 01:26:09

记录始终写入日志,即使它们被回滚也是如此。默认情况下,消费者将看到回滚记录,您必须将消费者属性isolation.level设置为read_committed,以避免获得回滚记录。

https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

票数 0
EN

Stack Overflow用户

发布于 2021-07-09 04:38:11

从不同的类运行事务性方法。如果" run“方法和带”transaction“注释的方法在同一个类中,则CommandLineRunner.run()会绕过事务拦截器,从而防止事务性方法作为事务运行。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68305804

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档