首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何用monix的可观测性处理递归?

如何用monix的可观测性处理递归?
EN

Stack Overflow用户
提问于 2017-12-25 13:30:39
回答 1查看 427关注 0票数 0

使用一元,我试图通过构建一个ObservableNode并使用宽度优先算法来遍历一个图。然而,我有一个递归的问题。这里有一个片段说明了我的问题:

代码语言:javascript
复制
package gp

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._


object HelloObservable {

  type Node = Int

  //real case fetch next node across the network so the signature 
  //has to be Node -> List[Task[Node]]
  def nexts(i : Node) : List[Task[Node]] =
    List(Task(i), Task(i+1))

  def go(i :Node) : Task[Iterator[List[Node]]] =
    Task.sequence(nexts(i).sliding(100,100).map(Task.gatherUnordered))

  def explore(r: Node): Observable[Node] = {
    val firsts = for {
      ilr <- Observable.fromTask(go(r))
      lr <- Observable.fromIterator(ilr)
      r <- Observable.fromIterable(lr)
    } yield r

    firsts ++ firsts.flatMap(explore)
  }


  def main(args : Array[String]) : Unit = {

    val obs = explore(0)

    val cancelable = obs
      .dump("O")
      .subscribe()

    scala.io.StdIn.readLine()

  }

}

第一次迭代后可观察到的停止。有人能告诉我为什么吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-12-26 05:43:04

我认为这个问题与递归无关。我认为这是因为您使用的是sliding,它返回一个IteratorIteratorIterable的主要区别在于,您只能使用Iterator一次,然后只剩下一个空的Iterator。这意味着当您执行firsts.flatMap时,Observable.fromIterator(ilr)中什么都没有了,因此不会产生任何结果。

从根本上说,我认为如果你不能在内存中保持前缀(大部分),你就无法进行广度优先搜索。但是,由于您的nexts已经返回了List,所以我假设您可以在内存中拥有该列表的两个副本。第二个副本是sliding的物化结果。所以你的固定代码应该是这样的:

代码语言:javascript
复制
object HelloObservable {

    import monix.eval.Task
    import monix.execution.Scheduler.Implicits.global
    import monix.reactive._

    type Node = Int

    //real case fetch next node across the network so the signature
    //has to be Node -> List[Task[Node]]
    def nexts(i: Node): List[Task[Node]] = List(Task(i), Task(i + 1))

    def go(i: Node): Task[List[List[Node]]] =
      Task.sequence(nexts(i).sliding(100, 100).toList.map(Task.gatherUnordered))


    def explore(r: Node): Observable[Node] = {
      val firsts = for {
        ilr <- Observable.fromTask(go(r))
        lr <- Observable.fromIterable(ilr)
        r <- Observable.fromIterable(lr)
      } yield r
      firsts ++ firsts.flatMap(explore)
    }


    def main(args: Array[String]): Unit = {

      val obs = explore(0)

      val cancelable = obs
        .dump("O")
        .subscribe()

      scala.io.StdIn.readLine()

    }
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47969092

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档