我使用以下代码在一个blocking块中获取一个JDBC连接,并将该连接传递给一个fn: Connection => Future[_]。在fn完成后,我想提交/回滚事务并关闭连接。
def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
Future {
blocking {
ds.getConnection
}
}.flatMap { conn =>
fn(conn)
.map { r => conn.commit(); conn.close(); r }
.recoverWith {
case e: Throwable =>
conn.rollback()
conn.close()
throw e
}
}我使用了一个基于ForkJoinPool的单独的执行上下文。
有了足够的调用,这段代码就会陷入死锁。直觉上,这是有道理的。使用getConnection调用的第一个将来会在等待可用连接时被阻塞,而可用连接正在等待ExecutionContext中的可用线程运行commit(); close()块,以释放连接并释放执行上下文中的线程以供getConnection运行。我用线程转储验证了这一点。
我发现解决这个问题的唯一方法是在同一个Future {}上运行所有东西,从而避免切换上下文:
def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
Future {
blocking {
val conn = ds.getConnection
try {
conn.setAutoCommit(false)
val r = Await.result(fn(conn), Duration.Inf)
conn.commit()
r
} catch {
case e: Throwable =>
conn.rollback()
throw e
} finally
conn.close()
}
}但是这样我就在Await.result上屏蔽了。我想这不是一个大问题,因为我在blocking块中阻塞,但我担心这会有不可预见的后果,并且不一定是这个接口的调用者所期望的。
有没有办法绕过这个死锁而不使用Await,只依赖于未来的组合呢?
我认为这是一个例子,这个函数不接受Connection => Future[T],而只接受一个Connection => T,但我想保留这个API。
如果我将ForkJoinPool的大小增加到足够大,它就可以工作,但是很难计算/预测所有工作负载的大小,而且我不希望ForkJoinPool的大小是数据库池大小的许多倍。
发布于 2021-07-03 19:16:26
正如评论中提到的,fn正在阻止代码。但是它不在blocking子句中,所以它将占用池中的一个主线程。如果这种情况发生的次数足够多,池中的线程就会耗尽,系统就会死锁。
因此,对fn的调用和随后的代码需要在blocking子句中,以便为它创建一个单独的线程,并且主线程仍然可用于非阻塞代码。
考虑到阻塞代码的数量,可能值得考虑Task模型,每个连接一个线程,而不是每个挂起的操作一个线程,这样线程的数量就会受到限制。这基本上是对HikariCP的一个问题-- getConnection是同步的--的一种变通方法。
https://stackoverflow.com/questions/68234817
复制相似问题