首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花结构化流Trigger.ProcessingTime定时精度的研究

火花结构化流Trigger.ProcessingTime定时精度的研究
EN

Stack Overflow用户
提问于 2020-07-17 06:58:59
回答 1查看 3.1K关注 0票数 0

我有一个火花工作,为卡夫卡数据的结构化流。基本代码如下。

代码语言:javascript
复制
val rules_monitoring_stream = rules_imsi_df.writeStream
  .outputMode("append")
  .format("memory")
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    if(!batchDF.isEmpty) {
      printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())
      batchDF.show()
      batchDF.persist()
      // ... Processing batchDF and populate a static dataframe
      batchDF.unpersist()
    }
  }
  .start()

while(rules_monitoring_stream.isActive) {
  Thread.sleep(240000)
  // Periodically load data from database
}

其基本思想是在120秒的窗口中对kafka数据进行流,处理微批处理数据并填充静态数据。

根据我的理解,通过这个设计,微批应该每隔120秒到达一次,而batchDF包含在这个时间窗口中摄入的数据。

但是,基于我对printf语句的微批到达时间的监控。我找到了下面的输出。

代码语言:javascript
复制
At 1594968139, the microbatch has 110 records
At 1594968242, the microbatch has 118 records
At 1594968380, the microbatch has 243 records
At 1594968483, the microbatch has 117 records
At 1594968602, the microbatch has 59 records

微批处理的相邻到达时间之间的差值似乎不太准确,为120秒。有时,它超过120秒,有时,它是少于120秒。

正常吗?如何理解Trigger.ProcessingTime指定的时间?如何获得更精确的时间窗?

此外,由于这个不准确,会否造成一些资料损失的微信?我的意思是,有些数据从来没有被任何微批次捕获过?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-12-08 06:47:25

”正常吗?如何理解Trigger.ProcessingTime指定的时间?

是的,这很正常。请记住,配置的触发器触发流作业的整个查询,而不仅仅是foreachBatch方法。由于您通常有不同数量的记录和不同的处理时间,所以在foreachBatch调用中的实际写入也不会是固定的时间。

“如何获得更精确的时间窗口?”

触发器的工作非常准确,您可以考虑另一种测量触发时间的方法,例如在查询开始时(在readStream调用之后)检查时间。

”此外,由于这种不精确性,会不会给这些微批造成一些数据丢失?我的意思是,有些数据从未被任何微批捕获过?

没有,没有数据丢失。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62948735

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档