首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink如何设置初始水印

Flink如何设置初始水印
EN

Stack Overflow用户
提问于 2018-02-02 13:43:55
回答 1查看 2K关注 0票数 0

我正在使用Flink 1.3.2和scala构建一个流媒体应用程序,我的Flink应用程序将监控一个文件夹,并将新文件流式传输到管道中。文件中的每个记录都有一个关联的时间戳。我想使用这个时间戳作为事件时间,并使用AssignerWithPeriodicWatermarks[T]构建水印,我的水印生成器如下所示:

代码语言:javascript
复制
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[Activity] {
    val maxTimeLag = 6 * 3600000L // 6 hours
    override def extractTimestamp(element: Activity, previousElementTimestamp: Long): Long = {
    val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
        val timestampString = element.getTimestamp
    }
    override def getCurrentWatermark(): Watermark = {
      new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
  }

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(10000L)
val stream = env.readFile(inputformart, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100)

val activity = stream
      .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
      .map { line =>
        new tuple.Tuple2(line.id, line.count)
      }.keyBy(0).addSink(...)

但是,因为我的文件夹中有一些旧数据,所以我不想处理它们。旧文件中记录的时间戳大于6小时,应该比水印早。但是,当我开始运行它时,我仍然可以看到创建了一些初始输出。我想知道水印的初始值是如何设置的,是在第一个间隔之前还是之后?这可能是我误解了这里的一些东西,但需要一些建议。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-02-02 15:46:22

您已经展示过的管道中没有关心时间的运算符--没有窗口,没有ProcessFunction计时器--所以每个流元素都将畅通无阻地通过并被处理。如果您的目标是跳过延迟的元素,则需要引入一些(以某种方式)将事件时间戳与当前水印进行比较的内容。

您可以通过在keyBy和接收器之间引入一个步骤来实现这一点,如下所示:

代码语言:javascript
复制
...
.keyBy(0)
.process(new DropLateEvents())
.addSink(...)

public static class DropLateEvents extends ProcessFunction<...> {
    @Override
    public void processElement(... event, Context context, Collector<...> out) throws Exception {
        TimerService timerService = context.timerService();
        if (context.timestamp() > timerService.currentWatermark()) {
           out.collect(event);
        }
    }
}

做完这些之后,你关于初始水印的问题就变得有意义了。对于周期性水印,初始水印是Long.MIN_VALUE,因此在发出第一个水印之前不会考虑延迟,这将在操作10秒后发生(根据您设置的自动水印间隔)。

如果您想更详细地了解周期性水印是如何生成的,相关代码为here

如果您希望避免在前10秒内处理延迟元素,您可以完全忘记使用事件时间和水印,只需修改上面显示的processElement方法,将事件时间戳与System.currentTimeMillis() - maxTimeLag而不是与当前水印进行比较。另一种解决方案是使用带标点的水印,并在第一个事件中发出水印。

或者更简单地说,您可以检测并删除flatMap或过滤器中的延迟事件,因为您定义的是相对于System.currentTimeMillis()而不是相对于水印的延迟。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48576391

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档