我需要一个组合器来解决以下问题:
test("groupUntil") {
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val grouped: Stream[IO, Stream[IO, Int]] = s.groupUntil(_ == 1)
val result =
for {
group <- grouped
element <- group.fold(0)(_ + _)
} yield element
assertEquals(result.compile.toList.unsafeRunSync(), List(10, 9, 5))
}内在的溪流也必须懒惰。(注意,groupUntil是我想要的假想组合器)。
注意:当内部流到达原始流时,我必须立即处理它们的每个元素,也就是说,我不能等待对整个组进行分组。
发布于 2021-06-16 09:40:29
的一种方法是在fold函数中使用Stream作为容器,您可以在这里实现惰性:
import cats.effect.IO
import fs2.Stream
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val acc: Stream[IO, Stream[IO, Int]] = Stream.empty
val grouped: Stream[IO, Stream[IO, Int]] = s.fold(acc) {
case (streamOfStreams, nextInt) if nextInt == 1 =>
Stream(Stream(nextInt).covary[IO]).append(streamOfStreams)
case (streamOfStreams, nextInt) =>
streamOfStreams.head.map(_.append(Stream(nextInt).covary[IO])) ++
streamOfStreams.tail
}.flatten
val result: Stream[IO, IO[Int]] = for {
group <- grouped
element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync().reverse, List(10, 9, 5))小心,结果您会得到相反的流,因为使用流的last元素不是个好主意,更好的方法是使用head,但它要求我们在处理结束时列出reverse列表。
的另一种方式是某些predicate使用groupAdjacentBy和group元素
val groupedOnceAndOthers: fs2.Stream[IO, (Boolean, Chunk[Int])] =
s.groupAdjacentBy(x => x == 1)在这里,您将看到有对的组:
(true,Chunk(1)), (false,Chunk(2, 3, 4)),
(true,Chunk(1)), (false,Chunk(2, 3, 3)),
(true,Chunk(1)), (false,Chunk(2, 2))要将组与1连接在一起,我们可以使用chunkN (如scala中的grouped )和map结果来消除布尔对和flatMap,使Chunk变得平坦:
val grouped = groupedOnceAndOthers
.chunkN(2, allowFewer = true)
.map(ch => ch.flatMap(_._2).toList)结果分组为:List(1, 2, 3, 4) List(1, 2, 3, 3) List(1, 2, 2)
全工作样本:
import cats.effect.IO
import fs2.Stream
val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val grouped: Stream[IO, Stream[IO, Int]] = s.groupAdjacentBy(x => x == 1)
.chunkN(2, allowFewer = true)
.map(ch => Stream.fromIterator[IO](ch.flatMap(_._2).iterator))
val result: Stream[IO, IO[Int]] = for {
group <- grouped
element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync(), List(10, 9, 5))https://stackoverflow.com/questions/67998771
复制相似问题