首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >基于谓词将fs2流分组为子流

基于谓词将fs2流分组为子流
EN

Stack Overflow用户
提问于 2021-06-16 08:09:33
回答 1查看 395关注 0票数 0

我需要一个组合器来解决以下问题:

代码语言:javascript
复制
test("groupUntil") {
  val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
  val grouped: Stream[IO, Stream[IO, Int]] = s.groupUntil(_ == 1)

  val result =
    for {
      group   <- grouped
      element <- group.fold(0)(_ + _)
  } yield element

  assertEquals(result.compile.toList.unsafeRunSync(), List(10, 9, 5))
}

内在的溪流也必须懒惰。(注意,groupUntil是我想要的假想组合器)。

注意:当内部流到达原始流时,我必须立即处理它们的每个元素,也就是说,我不能等待对整个组进行分组。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-06-16 09:40:29

的一种方法是在fold函数中使用Stream作为容器,您可以在这里实现惰性:

代码语言:javascript
复制
import cats.effect.IO
import fs2.Stream

val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val acc: Stream[IO, Stream[IO, Int]] = Stream.empty
val grouped: Stream[IO, Stream[IO, Int]] = s.fold(acc) {
  case (streamOfStreams, nextInt) if nextInt == 1 =>
    Stream(Stream(nextInt).covary[IO]).append(streamOfStreams)
  case (streamOfStreams, nextInt) =>
    streamOfStreams.head.map(_.append(Stream(nextInt).covary[IO])) ++ 
      streamOfStreams.tail
}.flatten

val result: Stream[IO, IO[Int]] = for {
  group <- grouped
  element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync().reverse, List(10, 9, 5))

小心,结果您会得到相反的流,因为使用流的last元素不是个好主意,更好的方法是使用head,但它要求我们在处理结束时列出reverse列表。

的另一种方式是某些predicate使用groupAdjacentBy和group元素

代码语言:javascript
复制
val groupedOnceAndOthers: fs2.Stream[IO, (Boolean, Chunk[Int])] = 
  s.groupAdjacentBy(x => x == 1)

在这里,您将看到有对的组:

代码语言:javascript
复制
(true,Chunk(1)), (false,Chunk(2, 3, 4)), 
(true,Chunk(1)), (false,Chunk(2, 3, 3)), 
(true,Chunk(1)), (false,Chunk(2, 2))

要将组与1连接在一起,我们可以使用chunkN (如scala中的grouped )和map结果来消除布尔对和flatMap,使Chunk变得平坦:

代码语言:javascript
复制
val grouped = groupedOnceAndOthers
  .chunkN(2, allowFewer = true)
  .map(ch => ch.flatMap(_._2).toList)

结果分组为:List(1, 2, 3, 4) List(1, 2, 3, 3) List(1, 2, 2)

全工作样本:

代码语言:javascript
复制
import cats.effect.IO
import fs2.Stream

val s = Stream(1, 2, 3, 4, 1, 2, 3, 3, 1, 2, 2).covary[IO]
val grouped: Stream[IO, Stream[IO, Int]] = s.groupAdjacentBy(x => x == 1)
  .chunkN(2, allowFewer = true)
  .map(ch => Stream.fromIterator[IO](ch.flatMap(_._2).iterator))

val result: Stream[IO, IO[Int]] = for {
  group <- grouped
  element = group.compile.foldMonoid
} yield element
assertEquals(result.map(_.unsafeRunSync()).compile.toList.unsafeRunSync(), List(10, 9, 5))
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67998771

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档