Monix使用Ack同步发出的消息,但是如果我使用groupBy和flatMap,内部可观察到的不跟随来自source的背压。
请参阅此测试代码:
import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test
class MonixBackpressureWithGroupByTest2 {
@Test
def test(): Unit = {
val source = Observable.range(0,130)
val backPressuredStream = source.map(x => {
println("simple log first map - " + x)
x
})
.asyncBoundary(OverflowStrategy.BackPressure(5))
.map { i =>
println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
((i % 3) toString) -> i
}
.groupBy{case (k, v) => k}
.flatMap(x => {
val mapWithSleep = x.map{case groupedMsg@(key, value) =>
Thread.sleep(2000)
println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
groupedMsg
}
mapWithSleep
})
backPressuredStream.share.subscribe(
(keyAndValue: (String, Long)) => Continue
)
global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
println("========sleep 1 second ============")
})
Thread.currentThread().join()
}
}产出:
...
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...看上去有些背压不匹配:
后:sleep 2 second for every message ...背压给三项after backpressure map - ...
在背压方面,sleep 2 second for every message ...如何与after backpressure map - ...有一对一的关系?
还有另一个疑问:为什么日志的sleep 2 second for every message输出(0, 72), (0, 75), (0,78)但是这样的事情(0, 72), (1, 73), (2,74)?
谢谢。
Monix版本:"io.monix" %% "monix" % "3.0.0-RC1"
发布于 2019-04-26 12:35:51
你所看到的行为正是你所能期望的。
为了快速总结您的应用程序,让我用我的话来解释它:
您有一个Observable生成数字,并对每个元素做一些副作用。
接下来,将元素按_ % 3分组。
接下来,在每个组的Observable中做一些更多的副作用(睡眠和写入控制台)。
然后,您可以flatMap每个组的Observable,从而生成一个单一的、平坦的Observable。
那么,为什么在开始时只看到第一个组(其中_ % 3 == 0)将内容打印到控制台?*
答案在于flatMap:当查看文档 for Observable时,您将发现对flatMap的以下描述
final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]
Alias for concatMap.
[...]想一想Observables,就像考虑Lists一样:当您连接Lists时,您将得到一个List,它首先包含第一个List的元素,然后是第二个List的元素,依此类推。
在Monix中,通过等待在Observable (read:concatMap)操作中生成的第一个Observable来发送“已完成”的信号,就可以实现对flatMap的相同行为。只有这样,第二个Observable才会被消费,依此类推。
或者简单地说,flatMap 关心产生的的序列。
但是,您的Observable操作中的flatMap什么时候“完成”?为此,我们必须了解groupBy是如何工作的,因为这就是它们的来源。
要使groupBy工作,尽管Observable的值是延迟计算的,但它必须将传入的元素存储在缓冲区中。我不能100%肯定这一点,但是如果groupBy像我想的那样工作,那么对于任何提取下一个元素的分组Observable,它都会无限期地遍历原始Observable,直到它找到属于这个组的元素为止,保存属于该缓冲区中其他组的所有先前(但还不是必需的)元素,供以后使用。
所有这一切都意味着,在源groupBy信号完成之前,Observable无法知道一个组的所有元素是否已经找到,然后它将使用所有剩余的缓冲元素,然后向分组的Observables发送信号完成。
用更简单的话说:由** groupBy 生成的在源 Observable 完成后才能完成。
当将所有这些信息结合在一起时,您将理解只有当源可观测性(您的Observable.range(0, 130))已经完成时,第一个分组Observable也将被完成,因为只有flatMap,所有其他分组Observable才会被使用。
因为我从您的上一个问题中了解到,您正在尝试构建一个网络套接字,使用flatMap是个坏主意--您的传入请求的源Observable永远不会完成,只会有效地提供您遇到的第一个IP地址。
您所要做的是使用mergeMap**.**与concatMap相比,mergeMap并不关心元素的顺序,而是使用“先到先得”--规则适用。
*:当你到达我的解释的结尾,并希望理解groupBy和flatMap的工作原理时,你就会明白我为什么写“在开始”!
https://stackoverflow.com/questions/55843648
复制相似问题