我有一个Process[Task, A],我需要在流的每个A上运行一个函数A => B,它的运行时间范围从瞬时到很长,以产生一个Process[Task, B]。
问题是,我希望在ExecutionContext中尽可能快地处理每个A,并在得到结果后尽快传递结果,而不管接收A的顺序如何。
一个具体的例子是下面的代码,我希望所有的奇数都能立即打印出来,而偶数大约要在500ms后打印出来。取而代之的是打印(奇数,偶数)对,并以500ms的停顿进行交错:
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext
import scalaz.stream._
import scalaz.concurrent.Task
object Test extends App {
val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
Process.range(0, 100).flatMap { i =>
Process.eval(Task.apply {
if(i % 2 == 0) Thread.sleep(500)
i
}(executor))
}.to(io.printStreamSink(System.out)(_ println _))
.run.run
executor.shutdown()
executor.awaitTermination(10, TimeUnit.MINUTES)
}发布于 2015-08-11 18:22:26
事实证明答案是使用通道。下面是更新后的代码,看起来完全符合我的要求:
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext
import scalaz.stream._
import scalaz.concurrent.Task
object Test extends App {
val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
val chan = channel.lift[Task, Int, Int] { i => Task {
if(i % 2 == 0) Thread.sleep(500)
i
}}
merge.mergeN(8)(Process.range(0, 100).zipWith(chan)((i, f) => Process.eval(f(i))))
.to(io.printStreamSink(System.out)(_ println _)).run.run
executor.shutdown()
executor.awaitTermination(10, TimeUnit.MINUTES)
}https://stackoverflow.com/questions/31938080
复制相似问题