有1个高吞吐量Kafka流,定义如下
val stream: DataStream[A] = flinkEnv
.addSource(kafkaStreamSource)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withIdleness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedAt
}
}
)
)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(.......)上述窗口操作符的水印可以正确转发。
上面窗口操作符中的DataStream[A]需要用一些保存在一些S3文件中的信息来丰富。S3文件很少更新。
将S3文件作为流读取,然后广播以丰富DataStream[A]中的元素。
val textInputFormat = new TextInputFormat(new Path("s3 path...."))
val enrichWithElements: BroadcastStream[EnrichWithElement] = flinkEnv.readFile(textInputFormat, "s3 path ...", FileProcessingMode.PROCESS_CONTINUOUSLY, 30)
.map(s3Element => {
EnrichWithElement(.....)
})
.broadcast(new MapStateDescriptor......)然后连接这两个流,以用EnrichWithElement类型的元素丰富所有A类型的元素。
class EnrichedAProcess
extends BroadcastProcessFunction[A,EnrichWithElement,EnrichedAElement] {
override def processElement(
value: A,
ctx: Context,
out: Collector[EnrichedAElement]): Unit = {
.....
out.collect(EnrichedAElement(....))
}
override def processBroadcastElement(
value: EnrichWithElement,
ctx: Context,
out: Collector[EnrichedAElement]): Unit = {
.........
}
}
stream
.connect(enrichWithElements)
.process(new EnrichedAProcess)EnrichedAProcess有2个输入。其中之一是不断地转发水印,但广播的流没有任何时间信息或水印。这导致EnrichedAProcess's水印根本不转发,因为它的一个输入没有传入水印。
有没有办法指定EnrichedAProcess's水印只依赖于非广播的输入?
发布于 2021-10-29 09:27:08
操作员将多个输入通道将其自己的水印设置为从所有活动通道接收的最新水印的最小值。
您可以做的是对始终返回MAX_WATERMARK作为水印的广播流应用WatermarkStrategy。(您不需要担心为该流分配时间戳。)
https://stackoverflow.com/questions/69765403
复制相似问题