首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为fs2.Stream的所有元素并行调度计算

为fs2.Stream的所有元素并行调度计算
EN

Stack Overflow用户
提问于 2020-07-14 17:00:28
回答 2查看 1.2K关注 0票数 2

我有一个由一些元素(可能是无限的)组成的fs2.Stream,我希望同时为流的所有元素安排一些计算。这是我试过的

代码语言:javascript
复制
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO]     = IO.timer(ExecutionContext.global)

val stream = for {
  id <- fs2.Stream.emits(List(1, 2)).covary[IO]
  _ <- fs2.Stream.awakeEvery[IO](1.second)
  _ <- fs2.Stream.eval(IO(println(id)))
} yield ()

stream.compile.drain.unsafeRunSync()

程序输出如下所示

代码语言:javascript
复制
1
1
1
etc...

这不是我们所期望的。我想为原始流的所有元素交错预定的计算,但不要等到第一个流结束(由于无限调度而永远不会发生这种情况)。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-07-14 17:49:13

代码语言:javascript
复制
val str = for {
  id <- Stream.emits(List(1, 5, 7)).covary[IO]
  res = timer.sleep(id.second) >> IO(println(id))
} yield res

val stream =  str.parEvalMapUnordered(5)(identity)

stream.compile.drain.unsafeRunSync()

代码语言:javascript
复制
 val stream = Stream.emits(List(1, 5, 7))
   .map { id => 
     Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
   .parJoinUnbounded

stream.compile.drain.unsafeRunSync()
票数 1
EN

Stack Overflow用户

发布于 2020-07-14 17:46:43

根据@KrzysztofAtłasik和@LuisMiguelMejíaSuárez给出的提示,我刚刚想出了一个解决方案:

代码语言:javascript
复制
val originalStream = fs2.Stream.emits(List(1, 2))

val scheduledComputation = originalStream.covary[IO].map({ id =>
        fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten

@KrzysztofAtłasik在注释中提出的与交错的_ <- fs2.Stream.awakeEvery[IO](1.second)和ł的解决方案也有效,但它不允许以自己的方式安排每个元素。

要将元素并发调度为elementValue秒,可以执行以下操作:

代码语言:javascript
复制
val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
                                 //id.seconds
        fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62900264

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档