我有两个流A和B。
我开始同时摄入A和B。
流A只在每分钟的59秒获得一条记录。
流B获取一分钟内的任何一秒的记录。
我希望处理两个流是同步的。
示例:从流A在10:01:59之后,我将在10:02:59收到一条记录,直到10:02:59我也不想从流B读取任何内容。
这可以在Flink中实现吗?
发布于 2021-01-21 09:54:34
在Flink中,您不能不从流中读取记录,但您可以从流中删除(或以状态保存)记录。因此,您可以连接这两个流,并使用CoFlatMap进行处理。当您从流A获得一条记录时,将其保存为state。当您从流B获得记录时,根据流A的状态决定如何处理它。
发布于 2021-01-21 15:18:01
Flink使用基于推送的模型(随着源和汇点被重构为基于拉动的模型,这应该很快就会改变)来处理下游的元素。这意味着你不能“等到事件到达时才能获取更多的数据”,同时你必须在某个操作员状态下缓冲这些数据。Flink提供了various state backends供您使用。
为了给出一些关于kkruglers answer的可视化,给定两个流,这是我们如何在逻辑上连接它们,然后使用一个ListState,当另一个元素到达时检索其中一个:
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
object Test {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val streamA = env.fromCollection(List(1, 2, 3))
val streamB = env.fromCollection(List("a", "b", "c"))
streamA
.connect(streamB)
.process {
new CoProcessFunction[Int, String, (Int, String)] {
var myStateA: ListState[Int] = _
override def open(parameters: Configuration): Unit = {
myStateA = getRuntimeContext.getListState[Int](
new ListStateDescriptor[Int]("my_state", classOf[Int])
)
}
override def processElement1(
value: Int,
ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
out: Collector[(Int, String)]
): Unit = {
myStateA.add(value)
}
override def processElement2(
value: String,
ctx: CoProcessFunction[Int, String, (Int, String)]#Context,
out: Collector[(Int, String)]
): Unit = {
val list = myStateA.get().iterator().asScala.toList
val intFromState = list.headOption
intFromState match {
case Some(myInt) =>
out.collect((myInt, value))
case None => ()
}
myStateA.update(list.tail.asJava)
}
}
}
}
}注意:这个实现被简化了。这里不能保证元素的到达顺序,您需要将其添加到状态和实现中。您还可以使用Timers,从而为每个进入流的事件注册一个计时器,作为新数据到达时的指示。
https://stackoverflow.com/questions/65819467
复制相似问题