首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Flink同步的进程2流

使用Flink同步的进程2流
EN

Stack Overflow用户
提问于 2021-01-21 08:11:18
回答 2查看 136关注 0票数 0

我有两个流A和B。

我开始同时摄入A和B。

流A只在每分钟的59秒获得一条记录。

流B获取一分钟内的任何一秒的记录。

我希望处理两个流是同步的。

示例:从流A在10:01:59之后,我将在10:02:59收到一条记录,直到10:02:59我也不想从流B读取任何内容。

这可以在Flink中实现吗?

EN

回答 2

Stack Overflow用户

发布于 2021-01-21 09:54:34

在Flink中,您不能不从流中读取记录,但您可以从流中删除(或以状态保存)记录。因此,您可以连接这两个流,并使用CoFlatMap进行处理。当您从流A获得一条记录时,将其保存为state。当您从流B获得记录时,根据流A的状态决定如何处理它。

票数 0
EN

Stack Overflow用户

发布于 2021-01-21 15:18:01

Flink使用基于推送的模型(随着源和汇点被重构为基于拉动的模型,这应该很快就会改变)来处理下游的元素。这意味着你不能“等到事件到达时才能获取更多的数据”,同时你必须在某个操作员状态下缓冲这些数据。Flink提供了various state backends供您使用。

为了给出一些关于kkruglers answer的可视化,给定两个流,这是我们如何在逻辑上连接它们,然后使用一个ListState,当另一个元素到达时检索其中一个:

代码语言:javascript
复制
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._ 

object Test {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val streamA = env.fromCollection(List(1, 2, 3))
    val streamB = env.fromCollection(List("a", "b", "c"))

    streamA
      .connect(streamB)
      .process {
        new CoProcessFunction[Int, String, (Int, String)] {
          var myStateA: ListState[Int] = _

          override def open(parameters: Configuration): Unit = {
            myStateA = getRuntimeContext.getListState[Int](
              new ListStateDescriptor[Int]("my_state", classOf[Int])
            )
          }

          override def processElement1(
              value: Int,
              ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
              out: Collector[(Int, String)]
          ): Unit = {
            myStateA.add(value)
          }

          override def processElement2(
              value: String,
              ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
              out: Collector[(Int, String)]
          ): Unit = {
            val list = myStateA.get().iterator().asScala.toList
            val intFromState = list.headOption
            intFromState match {
              case Some(myInt) =>
                out.collect((myInt, value))
              case None => ()
            }

            myStateA.update(list.tail.asJava)
          }
        }
      }
  }
}

注意:这个实现被简化了。这里不能保证元素的到达顺序,您需要将其添加到状态和实现中。您还可以使用Timers,从而为每个进入流的事件注册一个计时器,作为新数据到达时的指示。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65819467

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档