我有一个由一些元素(可能是无限的)组成的fs2.Stream,我希望同时为流的所有元素安排一些计算。这是我试过的
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val stream = for {
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
_ <- fs2.Stream.awakeEvery[IO](1.second)
_ <- fs2.Stream.eval(IO(println(id)))
} yield ()
stream.compile.drain.unsafeRunSync()程序输出如下所示
1
1
1
etc...这不是我们所期望的。我想为原始流的所有元素交错预定的计算,但不要等到第一个流结束(由于无限调度而永远不会发生这种情况)。
发布于 2020-07-14 17:49:13
val str = for {
id <- Stream.emits(List(1, 5, 7)).covary[IO]
res = timer.sleep(id.second) >> IO(println(id))
} yield res
val stream = str.parEvalMapUnordered(5)(identity)
stream.compile.drain.unsafeRunSync()或
val stream = Stream.emits(List(1, 5, 7))
.map { id =>
Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
.parJoinUnbounded
stream.compile.drain.unsafeRunSync()发布于 2020-07-14 17:46:43
根据@KrzysztofAtłasik和@LuisMiguelMejíaSuárez给出的提示,我刚刚想出了一个解决方案:
val originalStream = fs2.Stream.emits(List(1, 2))
val scheduledComputation = originalStream.covary[IO].map({ id =>
fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten@KrzysztofAtłasik在注释中提出的与交错的_ <- fs2.Stream.awakeEvery[IO](1.second)和ł的解决方案也有效,但它不允许以自己的方式安排每个元素。
要将元素并发调度为elementValue秒,可以执行以下操作:
val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
//id.seconds
fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flattenhttps://stackoverflow.com/questions/62900264
复制相似问题