从下面URL中的“并行流中的水印”一节中,我们知道“操作符的当前事件时间是其输入流的事件时间的最小值”time.html
现在我们以窗口(1)实例的事件时间为例,并且我们知道事件时间是14(min(29,14)),但是如果发生以下序列水印事件会发生什么呢?
如果水印事件29在水印事件14之前到达窗口(1),会发生什么情况?
例如,假设水印事件29首先到达窗口(1)实例,由于水印14事件尚未到达,所以首先将窗口(1)实例的事件时间设置为29,然后假设水印14事件也到达窗口(1)实例,然后将窗口(1)实例的事件时间设置为14?(如果是这样的话,那么窗口(1)的事件时间将从29更改为14,变小),还假设源(2)生成水印39,然后到达窗口(1)实例,那么窗口(1)实例的事件时间将设置为29或39?
发布于 2018-06-01 15:07:03
最后,我还从源中得到了答案,就像David说的,“窗口的水印将留在Long.MIN_VALUE,直到两个输入流都有更大的值。”
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}发布于 2018-06-01 09:33:02
Such as suppose the watermark event 29 arrives at the window(1) instance
firstly, as the watermark 14 event hasn't arrived it, so the event time of
window(1) instance was set to 29 firstly ...这不对。在第一个合适的水印到达之前,使用Long.MIN_VALUE的占位符值。因此,窗口的水印将保留在Long.MIN_VALUE,直到两个输入流的更大值到达为止。
发布于 2018-05-31 17:02:34
简单地说,不是,在这种情况下,窗口不会变小(实际上可能会抛出异常)。
这就是BoundedOutOfOrderness水印提取器发挥作用的地方。使用它,您可以配置“无序”时间戳可能是什么样子,它将消除这些差异。默认情况下,使用AscendingTimestamp提取器,接收不正常的时间戳实际上是错误的。
此外,还有“允许延迟”的概念,它定义了在您收到的时间戳低于当前水印的情况下发生了什么。
例如,如果您知道您的数据源可能有60秒的抖动(由于处理时间延迟、地理距离等原因),您可以使用有界的无序提取器,其值为(TimeUnit.SECONDS, 60),这将有效地将整个窗口移回60年代。这将允许元素在60年代之间以任何顺序排列。
但是,如果您实际上希望元素完全按照顺序或以非常小的抖动出现,但您希望接受后期的元素进行处理,则可以使用允许迟到设置来定义当这些元素进入时流程应该如何运行。默认情况下,Flink会简单地删除它们,但是您可以配置一个时间段,Flink将为每个进来的元素重新启动您的窗口。
从根本上说,所有这些都取决于您的特定情况,以及您期望数据如何进入,以及如何处理延迟元素。Flink几乎允许任何组合在这里的设置。
https://stackoverflow.com/questions/50626964
复制相似问题