我有两个scalaz.concurrent.Task,它们正在执行对不同服务器的HTTP请求。
我希望以类似于Future.firstCompletedOf的方式组合它们,即:并行运行它们,并获得第一个成功完成的结果。
不幸的是,Task.gatherUnordered不是我想要的,因为它在返回结果之前运行每个任务直到完成。
发布于 2017-08-04 20:08:49
不确定如何在scalaz.concurrent中原生实现,但这一点对我来说很有效:
import scalaz.Nondeterminism._
import scalaz.std.either.eitherInstance
import scalaz.syntax.bitraverse._
def race[A, B](t1: Task[A], t2: Task[B]): Task[A \/ B] = {
Nondeterminism[Task].choose(t1, t2).map {
_.bimap(_._1, _._2)
}
}在scalaz.concurrent的继任者fs2中,它是fs2.async#race
发布于 2017-08-11 16:10:47
虽然使用bimap确实是正确的,但有一个替代实现:
import scalaz.concurrent.Task
import scalaz.Nondeterminism
def firstOf[A, B, C](ta: Task[A], tb: Task[B])(fa: A => C, fb: B => C): Task[C] =
Nondeterminism[Task].chooseAny(ta.map(fa), Seq(tb.map(fb))).map(_._1)
val task1 = Task { Thread.sleep(10000); 4 }
val task2 = Task { Thread.sleep(5000); "test" }
firstOf(task1, task2)(_.toString, identity).unsafePerformSync // test在这里,我假设结果的非确定性检索用于获得确切计算时间未知的等价值。因此,该函数将并发执行的转换fa和fb合并为公共类型。它在转换时间很难计算的情况下也很好用-它选择转换后的第一个结果,例如,在HTTP的情况下,一些请求数据提取。对于更简单的情况,从firstOf中检索并行执行映射的race函数的变体,如下所示:
def race[A, B](ta: Task[A], tb: Task[B]): Task[A \/ B] = firstOf(ta, tb)(-\/(_), \/-(_))https://stackoverflow.com/questions/44849196
复制相似问题