我想组织一个线程屏障:给定一个锁对象,任何线程都可以获得它并继续线程的链,但是任何其他线程都将在同一个锁对象上休眠,直到第一个线程完成并释放锁。
让我们用代码(log()简单地在日志中打印字符串)来表达我的意图:
val mutex = Semaphore(1) // number of permits is 1
source
.subscribeOn(Schedulers.newThread()) // any unbound scheduler (io, newThread)
.flatMap {
log("#1")
mutex.acquireUninterruptibly()
log("#2")
innerSource
.doOnSubscribe(log("#3"))
.doFinally {
mutex.release()
log("#4")
}
}
.subscribe()它实际上运行得很好,我可以看到多个线程是如何显示日志"#1“的,其中只有一个线程进一步传播,获得锁对象互斥,然后释放它,我可以看到其他的日志,接下来的线程就会发挥作用。好的
但有时,当压力很大,线程数量更多时,比如4-5,我会遇到死锁:
实际上,获得锁的线程打印"#1“和"#2”,但是从不打印"#3“(所以没有调用),所以它实际上停止了,什么也不做,没有订阅flatMap中的innerSource。所以所有的线程都被阻塞了,而app完全没有响应。
我的问题是,在flatMap中进行阻塞操作安全吗?我深入研究了flatMap源代码,并看到了它在内部订阅的位置:
if (!isDisposed()) {
o.subscribe(new FlatMapSingleObserver<R>(this, downstream));
}是否有可能线程的订阅,即获得了锁,被以某种方式处理了?
发布于 2019-09-07 10:18:02
您可以使用flatMap第二个参数maxConcurrency并将其设置为1,这样它就可以在不手动锁定的情况下实现所需的功能。
https://stackoverflow.com/questions/57818045
复制相似问题