首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >scalaz-stream中的异步“节点”

scalaz-stream中的异步“节点”
EN

Stack Overflow用户
提问于 2015-08-11 17:29:08
回答 1查看 42关注 0票数 1

我有一个Process[Task, A],我需要在流的每个A上运行一个函数A => B,它的运行时间范围从瞬时到很长,以产生一个Process[Task, B]

问题是,我希望在ExecutionContext中尽可能快地处理每个A,并在得到结果后尽快传递结果,而不管接收A的顺序如何。

一个具体的例子是下面的代码,我希望所有的奇数都能立即打印出来,而偶数大约要在500ms后打印出来。取而代之的是打印(奇数,偶数)对,并以500ms的停顿进行交错:

代码语言:javascript
复制
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))

  Process.range(0, 100).flatMap { i =>
    Process.eval(Task.apply {
      if(i % 2 == 0) Thread.sleep(500)
      i
    }(executor))
  }.to(io.printStreamSink(System.out)(_ println _))
  .run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}
EN

回答 1

Stack Overflow用户

发布于 2015-08-11 18:22:26

事实证明答案是使用通道。下面是更新后的代码,看起来完全符合我的要求:

代码语言:javascript
复制
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
  val chan = channel.lift[Task, Int, Int] { i => Task {
    if(i % 2 == 0) Thread.sleep(500)
    i
  }}

  merge.mergeN(8)(Process.range(0, 100).zipWith(chan)((i, f) => Process.eval(f(i))))
    .to(io.printStreamSink(System.out)(_ println _)).run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31938080

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档