首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何按键对Fs2流进行分区,分别对每个分区进行转换?

如何按键对Fs2流进行分区,分别对每个分区进行转换?
EN

Stack Overflow用户
提问于 2018-12-21 14:18:20
回答 3查看 752关注 0票数 1

我想要实现的,例如,给定的数据:

代码语言:javascript
复制
time, part, data
0, a, 3
1, a, 4
2, b, 10
3, b, 20
3, a, 5

和转型:

代码语言:javascript
复制
stream.keyBy(_.part).scan(0)((s, d) => s + d)

得到:

代码语言:javascript
复制
0, a, 3
1, a, 7
2, b, 10
3, b, 30
3, a, 12

我尝试过使用groupAdjacentBy对其进行分区,但是它变得太复杂了,因为我需要用键在每个块之间保留复杂的状态。我想知道是否有类似于Flink DataStream.keyBy的东西?或者更简单的方法来实现它?

EN

回答 3

Stack Overflow用户

发布于 2018-12-24 14:39:42

好的,我发现了有趣的解决方案 (虽然不能是flatten )

票数 1
EN

Stack Overflow用户

发布于 2019-02-02 04:52:21

如前所述,这个问题可以通过扫描操作本身的“分区”来解决:

代码语言:javascript
复制
import cats.implicits._
import cats.effect.IO
import fs2._

case class Element(time: Long, part: Symbol, value: Int)

val elements = Stream(
  Element(0, 'a, 3),
  Element(1, 'a, 4),
  Element(2, 'b, 10),
  Element(3, 'b, 20),
  Element(3, 'a, 5)
)

val runningSumsByPart = elements
  .scan(Map.empty[Symbol, Int] -> none[Element]) {
    case ((sums, _), el@Element(_, part, value)) =>
      val sum = sums.getOrElse(part, 0) + value
      (sums + (part -> sum), el.copy(value = sum).some)
  }
  .collect { case (_, Some(el)) => el }

runningSumsByPart.covary[IO].evalTap(el => IO { println(el) }).compile.drain.unsafeRunSync()

产出:

元素(0,'a,3) 元素(1,'a,7) 元素(2,'b,10) 元素(3,'b,30) 元素(3,'a,12)

票数 0
EN

Stack Overflow用户

发布于 2020-12-11 00:21:43

我做了这样的事。先分开,然后合并。我还不知道如何返回2流。我只知道如何在一个地方处理它们,然后将它们合并在一起。

代码语言:javascript
复制
    val notEqualS = in
      .filter(_.isInstanceOf[NotEqual])
      .map(_.asInstanceOf[NotEqual])
      ...

    val invalidS = in
      .filter(_.isInstanceOf[Invalid])
      .map(_.asInstanceOf[Invalid])
      ...

    notEqualS.merge(invalidS)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53886292

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档