首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >扇入/扇出与Monix并发

扇入/扇出与Monix并发
EN

Stack Overflow用户
提问于 2020-05-16 17:52:45
回答 1查看 183关注 0票数 0

我正在努力学习Scala,并且玩得很开心,但我遇到了这个经典的问题。它让我想起了NodeJS早期的许多嵌套回调地狱。

下面是我在psuedocode中的程序:

获取一个S3桶列表的任务。任务一完成后,

  1. 要对桶的处理进行批次处理。每个批的
  2. 获取每个桶的区域。
  3. 过滤出不在该区域的桶。H 210H 111列出每个S3中的所有对象--H 214G 215

在某种程度上,我得到了一个类型:Task[Iterator[Task[List[Bucket]]]]

实质上:

外部任务是列出所有S3桶的第一步,然后内部Iterator/ task /List试图对返回列表的任务进行批处理。

我希望有一些方法可以移除/扁平外部任务,以到达Iterator[Task[List[Bucket]]]

当我试图将我的处理分解成步骤时,深度嵌套会使我做许多嵌套映射。这是正确的做法,还是有更好的方法来处理这种嵌套?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-16 18:51:06

在这种情况下,我建议使用Monix作为F的FS2:

代码语言:javascript
复制
import cats.implicits._
import monix.eval._, monix.execution._
import fs2._

// use your own types here
type BucketName = String
type BucketRegion = String
type S3Object = String

// use your own implementations as well
val fetchS3Buckets: Task[List[BucketName]] = Task(???)
val bucketRegion: BucketName => Task[BucketRegion] = _ => Task(???)
val listObject: BucketName => Task[List[S3Object]] = _ => Task(???)

Stream.evalSeq(fetchS3Buckets)
  .parEvalMap(10) { name =>
    // checking region, filtering and listing on batches of 10
    bucketRegion(name).flatMap {
      case "my-region" => listObject(name)
      case _           => Task.pure(List.empty)
    }
  }
  .foldMonoid // combines List[S3Object] together
  .compile.lastOrError // turns into Task with result
  .map(list => println(s"Result: $list"))
  .onErrorHandle { case error: Throwable => println(error) }
  .runToFuture // or however you handle it

下面的FS2使用cats.effect.IO或Monix,或者任何您想要的东西,只要它提供猫的效果类型类。它构建了一个很好的、功能强大的DSL来设计数据流,这样您就可以使用没有Akka流的反应性流。

这里有一个小问题,就是我们同时打印所有的结果,如果有更多的结果比内存所能处理的要多的话,这可能是个坏主意--我们可以分批打印(不确定这是不是您想要的),或者进行过滤和打印单独的批次。

代码语言:javascript
复制
Stream.evalSeq(fetchS3Buckets)
  .parEvalMap(10) { name =>
    bucketRegion(name).map(name -> _)
  }
  .collect { case (name, "my-region") => name }
  .parEvalMap(10) { name =>
    listObject(name).map(list => println(s"Result: $list"))
  }
  .compile
  .drain

虽然这些在裸Monix中都不是不可能的,但是FS2使这些操作更易于编写和维护,因此您应该能够更容易地实现您的流。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61841188

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档