首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >后期数据处理

后期数据处理
EN

Stack Overflow用户
提问于 2020-01-22 04:37:15
回答 1查看 872关注 0票数 3

错过窗口和.withAllowedLateness周期的后期数据将从管道中删除,如文档所示的这里

关于这种行为,我有几个问题:

  1. 如何处理从管道中丢弃的后期数据?我们可以添加默认行为吗?比如说,所有迟来的数据都应该记录在某个地方,就像“无时无刻”的水桶?
  2. 我们能有一个度量(/Beam)来说明这些消息中有多少是由于巨大的延迟而从管道中丢弃的吗?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-01-22 21:47:37

  1. 通常,我们将延迟数据定义为元素,当它们到达时,我们只希望删除它们,而不想进一步处理它们。据我所知,添加额外的功能来处理这些消息需要花费大量的精力来修改Java。但是,如果您只想记录它们,这是由LateDataDroppingDoFnRunner代码完成的,这是用于从过期窗口删除数据的负责任代码:
代码语言:javascript
复制
for (WindowedValue<InputT> input : concatElements) {
  BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
  if (canDropDueToExpiredWindow(window)) {
    // The element is too late for this window.
    droppedDueToLateness.inc();
    WindowTracing.debug(
        "{}: Dropping element at {} for key:{}; window:{} "
            + "since too far behind inputWatermark:{}; outputWatermark:{}",
        LateDataFilter.class.getSimpleName(),
        input.getTimestamp(),
        key,
        window,
        timerInternals.currentInputWatermarkTime(),
        timerInternals.currentOutputWatermarkTime());
  }
}

请注意,日志具有DEBUG级别,因此您可能看不到它。正如解释过的这里一样,要覆盖Dataflow中的级别,您可以使用--defaultWorkerLogLevel=DEBUG,甚至更好的是指定一个特定的类,比如--workerLogLevelOverrides={"org.apache.beam.sdk.util.WindowTracing":"DEBUG"}。明智地选择密钥可以帮助公开信息以识别丢弃的消息(即数据沿袭)。

  1. 正如在前面的片段中所看到的,droppedDueToLateness是一个计数器度量,每当我们删除一个元素:droppedDueToLateness.inc();时,它就会递增。您可以使用资源类型dataflow_job和度量custom.googleapis.com/dataflow/droppedDueToLateness的堆栈驱动程序监视它。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59852691

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档