我正在努力学习Scala,并且玩得很开心,但我遇到了这个经典的问题。它让我想起了NodeJS早期的许多嵌套回调地狱。
下面是我在psuedocode中的程序:
获取一个S3桶列表的任务。任务一完成后,
H 210H 111列出每个S3中的所有对象--H 214G 215在某种程度上,我得到了一个类型:Task[Iterator[Task[List[Bucket]]]]
实质上:
外部任务是列出所有S3桶的第一步,然后内部Iterator/ task /List试图对返回列表的任务进行批处理。
我希望有一些方法可以移除/扁平外部任务,以到达Iterator[Task[List[Bucket]]]。
当我试图将我的处理分解成步骤时,深度嵌套会使我做许多嵌套映射。这是正确的做法,还是有更好的方法来处理这种嵌套?
发布于 2020-05-16 18:51:06
在这种情况下,我建议使用Monix作为F的FS2:
import cats.implicits._
import monix.eval._, monix.execution._
import fs2._
// use your own types here
type BucketName = String
type BucketRegion = String
type S3Object = String
// use your own implementations as well
val fetchS3Buckets: Task[List[BucketName]] = Task(???)
val bucketRegion: BucketName => Task[BucketRegion] = _ => Task(???)
val listObject: BucketName => Task[List[S3Object]] = _ => Task(???)
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
// checking region, filtering and listing on batches of 10
bucketRegion(name).flatMap {
case "my-region" => listObject(name)
case _ => Task.pure(List.empty)
}
}
.foldMonoid // combines List[S3Object] together
.compile.lastOrError // turns into Task with result
.map(list => println(s"Result: $list"))
.onErrorHandle { case error: Throwable => println(error) }
.runToFuture // or however you handle it下面的FS2使用cats.effect.IO或Monix,或者任何您想要的东西,只要它提供猫的效果类型类。它构建了一个很好的、功能强大的DSL来设计数据流,这样您就可以使用没有Akka流的反应性流。
这里有一个小问题,就是我们同时打印所有的结果,如果有更多的结果比内存所能处理的要多的话,这可能是个坏主意--我们可以分批打印(不确定这是不是您想要的),或者进行过滤和打印单独的批次。
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
bucketRegion(name).map(name -> _)
}
.collect { case (name, "my-region") => name }
.parEvalMap(10) { name =>
listObject(name).map(list => println(s"Result: $list"))
}
.compile
.drain虽然这些在裸Monix中都不是不可能的,但是FS2使这些操作更易于编写和维护,因此您应该能够更容易地实现您的流。
https://stackoverflow.com/questions/61841188
复制相似问题