在Flink中,我找到了两种设置水印的方法,
第一个是
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(5000)第二个是
env.addSource(
new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)我想知道哪一个最终会生效。
发布于 2020-12-21 05:31:02
这两者之间根本没有冲突--它们是在处理不同的问题。指定的所有内容都将生效。
第一个,
env.getConfig.setAutoWatermarkInterval(5000)指定生成水印的频率(每5000毫秒生成一个水印)。如果未指定此参数,则将使用默认值200毫秒。
第二,
env.addSource(
new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)指定如何计算这些水印的细节。也就是说,它们应该由FlinkKafkaConsumer使用BoundedOutOfOrderness策略生成,具有10秒的有界延迟。WatermarkStrategy还需要一个时间戳分配器。
没有默认的WatermarkStrategy,所以如果您想使用事件时间,则需要类似于第二个代码片段的内容。
https://stackoverflow.com/questions/65383069
复制相似问题