我有一个简单的任务:通过artemis连接获取数据,->将数据保存到DB。如果我们有开放的会话-使用它。储蓄功能:
fun <T> merge(entity: T): Uni<T> {
return sessionFactory.withTransaction { session -> session.merge(entity) }
}如果我们有一个或罕见的事件,那么它是有效的。但是,如果我们在短时间内有几个事件(示例二),那么并不是所有的数据都会被保存。第一个事件的数据被保存,第二个事件的数据没有保存。使用日志记录和测试,我复制了以下行为:
sessionFactory.withTransaction { session1 ->
session1.merge(entity1).invoke { _ ->
sessionFactory.withTransaction { session2 ->
session2.merge(entity2)
}.subscribe().with(
{ println("success save entity2: $entity2") },
{ error -> println("error save entity2: $error") }
)
}
}.subscribe().with(
{ println("success save entity1: $entity1") },
{ error -> println("error save entity1: $error") }
)也就是说,如果第二个事件的数据记录是在调用合并函数之后开始的,但在会话关闭之前,则不会保存数据。
Next1。如果我使用的是持久化而不是merge (session2.merge -> session2.persist),数据将被保存。
Next2。如果在持久化之前添加find--行为与merge (不保存)相同:
sessionFactory.withTransaction { session2 ->
session2.find(Entry::class.java, entity2.id)
.flatMap { session2.persist(entity2) }
// session2.merge(entity2)
}Next3。如果我在合并之前添加了刷新-我得到了错误:
sessionFactory.withTransaction { session2 ->
session2.flush()
.flatMap { session2.merge(entity2) }
// session2.merge(entity2)
}E 13:24:23 23 [vert.x-eventloop-thread-0] errors.logSqlException - HR000057: Failed to execute statement [$1/* load Entity */ select entity0_.id as id1_3_0_, ... from Entity entity0_ where entity0_.id=$1]: $2could not load an entity: [Entity#b3c699b4-d490-4e56-bb76-904a1cb508a1]
error save entity2: javax.persistence.PersistenceException: org.hibernate.HibernateException: java.util.concurrent.CompletionException: io.vertx.pgclient.PgException: ERROR: current transaction is aborted, commands ignored until end of transaction block (25P02)
error save entity1: javax.persistence.PersistenceException: org.hibernate.HibernateException: io.vertx.pgclient.PgException: ERROR: duplicate key value violates unique constraint "entity_pkey" (23505)其中b3c699b4-d499-4e56-bb76-904a1cb508a1是实体2. is 。
在这种情况下,entity1保存了两次:第一次是在session1结束之后,第二次是在session1关闭之前。
问题:如何使用现有会话保存两个事件的数据?如果每次打开一个新会话,就没有问题。
发布于 2022-08-04 14:16:53
在最初的示例中,您使用的是.invoke而不是.call,不需要订阅两次。但是,您还需要确保同一会话不会在管道之间并行共享。
我的意思是,您不应该依赖这样的代码:
List<Uni<Void>> list = ...
for (Entity entity : entities) {
list.add( session.persist(entity) );
}
Uni.join().all( list ).chain(session::flush);在某些情况下,它可能看起来很有效,但它可能会导致很难调试的错误。那是因为会话不是线程安全的。
在这种情况下,如果要使用相同的会话合并两个实体,则需要确保操作按正确的顺序进行。
例如,这应该是可行的:
sessionFactory.withTransaction { session ->
session.merge(entity1)
.invoke{ entity -> println("success save entity1: $entity") }
.call { _ ->
session.merge(entity2)
.invoke{ entity -> println("success save entity2: $entity") }
}
}.subscribe().with(
{ println("Finished!") },
{ error -> println("Error: $error") }
)https://stackoverflow.com/questions/73237203
复制相似问题