首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Flink:水印不随广播流进行

Apache Flink:水印不随广播流进行
EN

Stack Overflow用户
提问于 2021-10-29 07:53:02
回答 1查看 66关注 0票数 0

有1个高吞吐量Kafka流,定义如下

代码语言:javascript
复制
val stream: DataStream[A] = flinkEnv
  .addSource(kafkaStreamSource)
  .assignTimestampsAndWatermarks(
     WatermarkStrategy
        .forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
        .withIdleness(Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner[A] {
            override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
                element.lastUpdatedAt
              }
            }
        )
  )
  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
  .reduce(.......)

上述窗口操作符的水印可以正确转发。

上面窗口操作符中的DataStream[A]需要用一些保存在一些S3文件中的信息来丰富。S3文件很少更新。

将S3文件作为流读取,然后广播以丰富DataStream[A]中的元素。

代码语言:javascript
复制
 val textInputFormat = new TextInputFormat(new Path("s3 path...."))
 val enrichWithElements: BroadcastStream[EnrichWithElement] = flinkEnv.readFile(textInputFormat, "s3 path ...", FileProcessingMode.PROCESS_CONTINUOUSLY, 30)
    .map(s3Element => {
      EnrichWithElement(.....)
    })
    .broadcast(new MapStateDescriptor......)

然后连接这两个流,以用EnrichWithElement类型的元素丰富所有A类型的元素。

代码语言:javascript
复制
class EnrichedAProcess
    extends BroadcastProcessFunction[A,EnrichWithElement,EnrichedAElement] {
  override def processElement(
      value: A,
      ctx: Context,
      out: Collector[EnrichedAElement]): Unit = {
      .....
      out.collect(EnrichedAElement(....))
  }

  override def processBroadcastElement(
      value: EnrichWithElement,
      ctx: Context,
      out: Collector[EnrichedAElement]): Unit = {
      .........
  }
}
stream
  .connect(enrichWithElements)
  .process(new EnrichedAProcess)

EnrichedAProcess有2个输入。其中之一是不断地转发水印,但广播的流没有任何时间信息或水印。这导致EnrichedAProcess's水印根本不转发,因为它的一个输入没有传入水印。

有没有办法指定EnrichedAProcess's水印只依赖于非广播的输入?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-29 09:27:08

操作员将多个输入通道将其自己的水印设置为从所有活动通道接收的最新水印的最小值。

您可以做的是对始终返回MAX_WATERMARK作为水印的广播流应用WatermarkStrategy。(您不需要担心为该流分配时间戳。)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69765403

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档