首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将一个scalaz流进程拆分为两个子流。

将一个scalaz流进程拆分为两个子流。
EN

Stack Overflow用户
提问于 2014-12-17 09:35:49
回答 3查看 1.1K关注 0票数 8

是否可以分割/分叉,然后重新加入流?

举个例子,假设我有以下函数

代码语言:javascript
复制
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers  = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

在本例中,对于黄曲柳,结果将如您所料--从1到10的数字元组传递到接收器。

但是,如果我们用需要IO的东西替换streamOfNumbers,它实际上将执行两次IO操作。

使用Topic,我可以创建一个pub/sub进程,正确地复制流中的元素,但是它不缓冲--它只是尽可能快地使用整个源,而不管接收器消耗它的速度如何。

我可以将它封装在一个有限制的队列中,但是最终的结果感觉比需要的要复杂得多。

是否有一种更简单的方法可以在不重复IO操作的情况下从源中分割流?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2014-12-18 08:25:10

另外,要用“拆分”的要求来澄清前面的答案delas。您的特定问题的解决方案可能不需要拆分流:

代码语言:javascript
复制
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
   case even if even % 2 == 0 => right(even)
   case odd => left(odd)
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)

val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???

summed
.drainW(evenSink)
.to(oddSink)
票数 6
EN

Stack Overflow用户

发布于 2014-12-18 08:16:13

您也许仍然可以使用topic,并且只需确保子进程在您推送到主题之前就会订阅。

不过,请注意,此解决方案没有任何限制,即,如果您将推动太快,您可能会遇到OOM错误。

代码语言:javascript
复制
def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = {
  val topic = async.topic[A]

  val sub1 = topic.subscribe
  val sub2 = topic.subscribe

  merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain))
}
票数 2
EN

Stack Overflow用户

发布于 2016-09-02 21:53:23

我同样需要这个功能。我的处境要复杂得多,不允许我以这种方式工作。

多亏了丹尼尔·斯皮瓦克在这条线上的回应,我才得以让下面的内容发挥作用。我改进了他的解决方案,添加了onHalt,这样我的应用程序就会在Process完成后退出。

代码语言:javascript
复制
def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = {
  val left = async.boundedQueue[A](limit)
  val right = async.boundedQueue[A](limit)

  val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause =>
    Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) }
  }
  val dequeue = Process((left.dequeue, right.dequeue))

  enqueue merge dequeue
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27522343

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档