首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >数据库事务不回滚

数据库事务不回滚
EN

Stack Overflow用户
提问于 2021-08-26 15:23:13
回答 1查看 738关注 0票数 0

我正在尝试编写一个spring云流函数(Spring-start-parent2.5.3,java 11,spring Version2020.0.3),它同时具有Kafka和Postgres事务。每当使用的消息以字符串"fail“开头时,该函数将引发一个模拟错误,我希望这会导致数据库事务回滚,然后导致kafka事务回滚。(我知道卡夫卡的交易不是XA,这很好。)到目前为止,我还没有让数据库事务工作,但是kafka事务确实起作用。

目前,我使用的是一个@Transactional注释,它似乎没有启动数据库事务。( Kafka 粘结剂文件建议使用ChainedTransactionManager同步数据库+卡夫卡事务,但春季卡夫卡文献表示不赞成使用@Transactional 注解,S.C.S. 这个问题的例子使用由start-jpa库创建的@Transactional注释和默认事务管理器(我认为)。我可以在我的调试器中看到,无论我是否在我的使用者上使用@EnableTransactionManagement和使用@Transactional,消费者都是使用堆栈中较高的事务模板在kafka事务中执行的,但我在任何地方都看不到数据库事务。

我有几个问题想要理解:

  • 卡夫卡侦听器容器在卡夫卡事务的上下文中运行我的消费者(不管我是否有@Transactional注释),对吗?如果是这样的话,有没有办法只在Kafka交易中运行特定的函数呢?
  • 由于容器没有办法拦截对生产者的调用(据我所知),上述情况会对生产者产生影响吗?
  • 我应该做什么来同步一个Kafka和一个数据库事务,以便DB提交在Kafka提交之前发生?

我有以下Crud、处理程序集合和application.yml:

代码语言:javascript
复制
@Repository
public interface AuditLogRepository extends CrudRepository<AuditLog, Long> {

  /**
   * Create a new audit log entry if and only if another with the same message does not already
   * exist. This is idempotent.
   */
  @Transactional
  @Modifying
  @Query(
      nativeQuery = true,
      value = "insert into audit_log (message) values (?1) on conflict (message) do nothing")
  void createIfNotExists(String message);
}
代码语言:javascript
复制
@Profile("ft")
@Configuration
@EnableTransactionManagement
public class FaultTolerantHandlers {

  private static final Logger LOGGER = LoggerFactory.getLogger(FaultTolerantHandlers.class);

  @Bean
  public NewTopic inputTopic() {
    return TopicBuilder.name("input").partitions(1).replicas(1).build();
  }

  @Bean
  public NewTopic inputDltTopic() {
    return TopicBuilder.name("input.DLT").partitions(1).build();
  }

  @Bean
  public NewTopic leftTopic() {
    return TopicBuilder.name("left").partitions(1).build();
  }

  @Bean
  public NewTopic rightTopic() {
    return TopicBuilder.name("right").partitions(1).build();
  }

  @Bean
  public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
      LOGGER.info("Producing messages to input...");
      template.send("input", "pass-1".getBytes());
      template.send("input", "fail-1".getBytes());
      template.send("input", "pass-2".getBytes());
      template.send("input", "fail-2".getBytes());
      LOGGER.info("Produced input.");
    };
  }

  @Bean
  ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
      BinderFactory binders) {
    return (container, dest, group) -> {
      ProducerFactory<byte[], byte[]> pf =
          ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
              .getTransactionalProducerFactory();
      KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(requireNonNull(pf));
      container.setAfterRollbackProcessor(
          new DefaultAfterRollbackProcessor<>(
              new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L)));
    };
  }

  // Receive messages from `input`.
  // For each input, write an audit log to the database.
  // For each input, produce a message to both `left` and `right` atomically.
  // After three failed attempts to achieve the above, shuffle the message
  // off to `input.DLT` and move on.
  @Bean
  @Transactional
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository
  ) {
    return input -> {
      bridge.send("left", ("left-" + input).getBytes());
      repository.createIfNotExists(input);

      if (input.startsWith("fail")) {
        throw new RuntimeException("Simulated error");
      }

      bridge.send("right", ("right-" + input).getBytes());
    };
  }

  @Bean
  public Consumer<Message<String>> logger() {
    return message -> {
      var receivedTopic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
      LOGGER.info("Received on topic=" + receivedTopic + " payload=" + message.getPayload());
    };
  }
}
代码语言:javascript
复制
spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: 'tx-'
          required-acks: all
      bindings:
        persistAndSplit-in-0:
          destination: input
          group: input
        logger-in-0:
          destination: left,right,input.DLT
          group: logger
          consumer:
            properties:
              isolation.level: read_committed
    function:
      definition: persistAndSplit;logger

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-26 16:38:11

代码语言:javascript
复制
  @Bean
  @Transactional
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository
  ) {

在本例中,@Transactional位于bean定义(在应用程序初始化期间只执行一次);要获得运行时事务,需要对lambda中的代码进行如此注释;例如.

代码语言:javascript
复制
  @Bean
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository,
      TxCode code
  ) {
    return Txcode:run;
  }
代码语言:javascript
复制
@Component
class TxCode {

    @Autowired
    AuditLogRepository repository

    @Autowired
    StreamBridge bridge;

    @Transactional
    void run(String input) {
      bridge.send("left", ("left-" + input).getBytes());
      repository.createIfNotExists(input);

      if (input.startsWith("fail")) {
        throw new RuntimeException("Simulated error");
      }

      bridge.send("right", ("right-" + input).getBytes());
    };
}

(或者你也可以通过桥和回购)。

代码语言:javascript
复制
return str -> code.run(str, repo, bridge);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68941306

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档