我正在尝试使用spring-data-r2dbc存储库结合TransactionalDatabaseClient来实现事务:
class SongService(
private val songRepo: SongRepo,
private val databaseClient: DatabaseClient
){
private val tdbc = databaseClient as TransactionalDatabaseClient
...
...
fun save(song: Song){
return tdbc.inTransaction{
songRepo
.save(mapRow(song, albumId)) //Mapping to a row representation
.delayUntil { savedSong -> tdbc.execute.sql(...).fetch.rowsUpdated() } //saving a many to many relation
.map(::mapSong) //Mapping back to actual song and retrieve the relationship data.
}
}
}我目前有一个从AbstractR2dbcConfiguration扩展的配置类(用@Configuration和@EnableR2dbcRepositories注释)。在这里,我重写了databaseClient方法以返回一个TransactionalDatabaseClient。这应该与SongService类中的实例相同。
在只进行订阅和打印的测试中运行代码时,我得到了org.springframework.transaction.NoTransactionException: ReactiveTransactionSynchronization not active,并且没有返回关系数据。
但是,在使用项目反应堆分步检验器时,我得到的是java.lang.IllegalStateException: Connection is closed。同样在这种情况下,不会返回关系数据。
顺便说一句,我见过https://github.com/spring-projects/spring-data-r2dbc/issues/44
发布于 2019-03-28 14:23:03
下面是一个有效的Java示例:
@Autowired TransactionalDatabaseClient txClient;
@Autowired Mono<Connection> connection;
//You Can also use: @Autowired Mono<? extends Publisher> connectionPublisher;
public Flux<Void> example {
txClient.enableTransactionSynchronization(connection);
// Or, txClient.enableTransactionSynchronization(connectionPublisher);
Flux<AuditConfigByClub> audits = txClient.inTransaction(tx -> {
txClient.beginTransaction();
return tx.execute().sql("SELECT * FROM audit.items")
.as(Item.class)
.fetch()
.all();
}).doOnTerminate(() -> {
txClient.commitTransaction();
});
txClient.commitTransaction();
audits.subscribe(item -> System.out.println("anItem: " + item));
return Flux.empty()
}我刚开始反应,所以不太确定我在用我的回调做什么哈哈。但我决定选择TransactionalDatabaseClient,而不是DatabaseClient或Connection,因为我会在R2dbc处于当前状态时使用我能获得的所有实用程序。
在您的代码中,您是否实际实例化了一个Connection对象?如果是这样的话,我想你应该已经在你的配置中这样做了。它可以像DatabaseClient一样在整个应用程序中使用,但它稍微复杂一些。
如果不是:
@Bean
@Override // I also used abstract config
public ConnectionFactory connectionFactory() {
...
}
@Bean
TransactionalDatabaseClient txClient() {
...
}
//TransactionalDatabaseClient will take either of these as arg in
//#enableTransactionSynchronization method
@Bean
public Publisher<? extends Connection> connectionPublisher() {
return connectionFactory().create();
}
@Bean
public Mono<Connection> connection() {
return = Mono.from(connectionFactory().create());
}如果您在转换到Kotlin时遇到问题,还有一种启用同步的替代方法可以工作:
// From what I understand, this is a useful way to move between
// transactions within a single subscription
TransactionResources resources = TransactionResources.create();
resources.registerResource(Resource.class, resource);
ConnectionFactoryUtils
.currentReactiveTransactionSynchronization()
.subscribe(currentTx -> sync.registerTransaction(Tx));希望这对Kotlin有很好的解释。
https://stackoverflow.com/questions/54290834
复制相似问题