我有一个火花工作,为卡夫卡数据的结构化流。基本代码如下。
val rules_monitoring_stream = rules_imsi_df.writeStream
.outputMode("append")
.format("memory")
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty) {
printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())
batchDF.show()
batchDF.persist()
// ... Processing batchDF and populate a static dataframe
batchDF.unpersist()
}
}
.start()
while(rules_monitoring_stream.isActive) {
Thread.sleep(240000)
// Periodically load data from database
}其基本思想是在120秒的窗口中对kafka数据进行流,处理微批处理数据并填充静态数据。
根据我的理解,通过这个设计,微批应该每隔120秒到达一次,而batchDF包含在这个时间窗口中摄入的数据。
但是,基于我对printf语句的微批到达时间的监控。我找到了下面的输出。
At 1594968139, the microbatch has 110 records
At 1594968242, the microbatch has 118 records
At 1594968380, the microbatch has 243 records
At 1594968483, the microbatch has 117 records
At 1594968602, the microbatch has 59 records微批处理的相邻到达时间之间的差值似乎不太准确,为120秒。有时,它超过120秒,有时,它是少于120秒。
正常吗?如何理解Trigger.ProcessingTime指定的时间?如何获得更精确的时间窗?
此外,由于这个不准确,会否造成一些资料损失的微信?我的意思是,有些数据从来没有被任何微批次捕获过?
发布于 2020-12-08 06:47:25
“
”正常吗?如何理解Trigger.ProcessingTime指定的时间?
是的,这很正常。请记住,配置的触发器触发流作业的整个查询,而不仅仅是foreachBatch方法。由于您通常有不同数量的记录和不同的处理时间,所以在foreachBatch调用中的实际写入也不会是固定的时间。
“如何获得更精确的时间窗口?”
触发器的工作非常准确,您可以考虑另一种测量触发时间的方法,例如在查询开始时(在readStream调用之后)检查时间。
“
”此外,由于这种不精确性,会不会给这些微批造成一些数据丢失?我的意思是,有些数据从未被任何微批捕获过?
没有,没有数据丢失。
https://stackoverflow.com/questions/62948735
复制相似问题