有没有办法把Seq[FutureX]变成Enumerator[X]?用例是我想通过抓取网络来获取资源。这将返回一个Futures序列,我想返回一个枚举数,它将按照期货首次完成的顺序推送到Iteratee。
看起来Victor Klang的Future select gist可以用来做这件事--尽管它看起来效率很低。
注意:有问题的迭代器和枚举器是由play框架版本2.x提供的,即具有以下导入:import play.api.libs.iteratee._
发布于 2014-06-26 15:53:57
我确实意识到这个问题已经有点老了,但基于Santhosh的回答和内置的Enumterator.enumerate()实现,我得出了以下结论:
def enumerateM[E](traversable: TraversableOnce[Future[E]])(implicit ec: ExecutionContext): Enumerator[E] = {
val it = traversable.toIterator
Enumerator.generateM {
if (it.hasNext) {
val next: Future[E] = it.next()
next map {
e => Some(e)
}
} else {
Future.successful[Option[E]] {
None
}
}
}
}请注意,与第一个基于Viktor-select的解决方案不同,这个解决方案保留了顺序,但您仍然可以在此之前异步开始所有计算。因此,例如,您可以执行以下操作:
// For lack of a better name
def mapEachM[E, NE](eventuallyList: Future[List[E]])(f: E => Future[NE])(implicit ec: ExecutionContext): Enumerator[NE] =
Enumerator.flatten(
eventuallyList map { list =>
enumerateM(list map f)
}
)当我偶然发现这个帖子时,后一种方法实际上就是我要找的。希望对某些人有帮助!:)
发布于 2013-03-21 19:32:05
您可以使用Java Executor Completeion (JavaDoc)构建一个。其思想是使用创建一系列新的期货,每个期货都使用ExecutorCompletionService.take()来等待下一个结果。每一个未来都会开始,当之前的未来有它的结果。
但请注意,这可能不是那么有效,因为许多同步都在幕后进行。使用一些并行的map reduce进行计算(例如,使用Scala的ParSeq)并让枚举器等待完整的结果可能会更有效。
发布于 2013-03-29 00:24:45
警告:应答前未编译
下面这样的内容如何:
def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] {
def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] =
Future.fold(seqFutureX)(i){ case (i, x) => i.flatMap(_.feed(Input.El(x)))) }
}https://stackoverflow.com/questions/15543261
复制相似问题