我正在尝试通过Spring Cloud在一个事务中向Kafka中的两个单独的主题发送消息。当在第一条和第二条消息之间抛出异常时,第一条消息仍然出现在第一个主题的使用者中,表明消息没有回滚。下面是我的代码:
@Configuration
@EnableTransactionManagement
public class KafkaChannelTester implements CommandLineRunner {
ChannelHolder channelHolder;
MessageChannel messageChannel1;
MessageChannel messageChannel2;
public KafkaChannelTester(ChannelHolder channelHolder) {
this.channelHolder = channelHolder;
this.messageChannel1 = channelHolder.messageChannel1();
this.messageChannel2 = channelHolder.messageChannel2();
}
@Override
public void run(String... args) throws Exception {
transactionFail();
}
public void throwException(){ throw new RuntimeException();}
@Transactional
public void transactionFail(){
Message<String> message1 = MessageBuilder
.withPayload("Test-transaction-fail-"+ LocalDateTime.now())
.build();
Message<String> message2 = MessageBuilder
.withPayload("Test-transaction-fail-"+ LocalDateTime.now())
.build();
messageChannel1.send(message1);
throwException();
messageChannel2.send(message2);
}
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
System.out.println(pf.transactionCapable());
System.out.println(pf.getTransactionIdPrefix());
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
return tm;
}
}application.yml包含以下内容:
spring:
cloud:
stream:
bindings:
cloud-producer-1:
destination:
peter.cloud.test.1
cloud-producer-2:
destination:
peter.cloud.test.2
kafka:
binder:
brokers:
- testkbroker:9092
transaction:
transaction-id-prefix: transaction-1-
producer:
configuration:
enable.idempotence: true
retries: 1
acks: alltransactionManager中的打印语句确认生产者工厂确实具有事务id前缀,并且具有事务处理能力。我可以做些什么来使事务正常工作?
发布于 2021-07-09 01:26:09
记录始终写入日志,即使它们被回滚也是如此。默认情况下,消费者将看到回滚记录,您必须将消费者属性isolation.level设置为read_committed,以避免获得回滚记录。
https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
发布于 2021-07-09 04:38:11
从不同的类运行事务性方法。如果" run“方法和带”transaction“注释的方法在同一个类中,则CommandLineRunner.run()会绕过事务拦截器,从而防止事务性方法作为事务运行。
https://stackoverflow.com/questions/68305804
复制相似问题