首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Monix是如何使用flatMap操作符的背压的?

Monix是如何使用flatMap操作符的背压的?
EN

Stack Overflow用户
提问于 2019-04-25 07:07:26
回答 1查看 270关注 0票数 4

Monix使用Ack同步发出的消息,但是如果我使用groupBy和flatMap,内部可观察到的不跟随来自source的背压。

请参阅此测试代码:

代码语言:javascript
复制
import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test


class MonixBackpressureWithGroupByTest2 {
  @Test
  def test(): Unit = {
    val source = Observable.range(0,130)

    val backPressuredStream = source.map(x => {
        println("simple log first  map - " + x)
        x
      })
      .asyncBoundary(OverflowStrategy.BackPressure(5))
      .map { i =>

        println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
        ((i % 3) toString) -> i
      }
      .groupBy{case (k, v) => k}
      .flatMap(x => {
        val mapWithSleep = x.map{case groupedMsg@(key, value) =>
          Thread.sleep(2000)
          println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
          groupedMsg
        }

        mapWithSleep

      })

    backPressuredStream.share.subscribe(
      (keyAndValue: (String, Long)) => Continue
    )

    global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
      println("========sleep 1 second ============")
    })

    Thread.currentThread().join()

  }

}

产出:

代码语言:javascript
复制
...

========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...

看上去有些背压不匹配:

后:sleep 2 second for every message ...背压给三项after backpressure map - ...

在背压方面,sleep 2 second for every message ...如何与after backpressure map - ...有一对一的关系?

还有另一个疑问:为什么日志的sleep 2 second for every message输出(0, 72), (0, 75), (0,78)但是这样的事情(0, 72), (1, 73), (2,74)

谢谢。

Monix版本:"io.monix" %% "monix" % "3.0.0-RC1"

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-26 12:35:51

你所看到的行为正是你所能期望的。

为了快速总结您的应用程序,让我用我的话来解释它:

您有一个Observable生成数字,并对每个元素做一些副作用。

接下来,将元素按_ % 3分组。

接下来,在每个组的Observable中做一些更多的副作用(睡眠和写入控制台)。

然后,您可以flatMap每个组的Observable,从而生成一个单一的、平坦的Observable

那么,为什么在开始时只看到第一个组(其中_ % 3 == 0)将内容打印到控制台?*

答案在于flatMap:当查看文档 for Observable时,您将发现对flatMap的以下描述

代码语言:javascript
复制
final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]

Alias for concatMap.

[...]

想一想Observables,就像考虑Lists一样:当您连接Lists时,您将得到一个List,它首先包含第一个List的元素,然后是第二个List的元素,依此类推。

在Monix中,通过等待在Observable (read:concatMap)操作中生成的第一个Observable来发送“已完成”的信号,就可以实现对flatMap的相同行为。只有这样,第二个Observable才会被消费,依此类推。

或者简单地说,flatMap 关心产生的的序列。

但是,您的Observable操作中的flatMap什么时候“完成”?为此,我们必须了解groupBy是如何工作的,因为这就是它们的来源。

要使groupBy工作,尽管Observable的值是延迟计算的,但它必须将传入的元素存储在缓冲区中。我不能100%肯定这一点,但是如果groupBy像我想的那样工作,那么对于任何提取下一个元素的分组Observable,它都会无限期地遍历原始Observable,直到它找到属于这个组的元素为止,保存属于该缓冲区中其他组的所有先前(但还不是必需的)元素,供以后使用。

所有这一切都意味着,在源groupBy信号完成之前,Observable无法知道一个组的所有元素是否已经找到,然后它将使用所有剩余的缓冲元素,然后向分组的Observables发送信号完成。

用更简单的话说:由** groupBy 生成的在源 Observable 完成后才能完成。

当将所有这些信息结合在一起时,您将理解只有当源可观测性(您的Observable.range(0, 130))已经完成时,第一个分组Observable也将被完成,因为只有flatMap,所有其他分组Observable才会被使用。

因为我从您的上一个问题中了解到,您正在尝试构建一个网络套接字,使用flatMap是个坏主意--您的传入请求的源Observable永远不会完成,只会有效地提供您遇到的第一个IP地址。

您所要做的是使用mergeMap**.**与concatMap相比,mergeMap并不关心元素的顺序,而是使用“先到先得”--规则适用。

*:当你到达我的解释的结尾,并希望理解groupByflatMap的工作原理时,你就会明白我为什么写“在开始”!

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55843648

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档