我有以下疑问:
val ds = dataFrame
.filter(! $"requri".endsWith(".m3u8"))
.filter(! $"bserver".contains("trimmer"))
.withWatermark("time", "120 seconds")
.groupBy(window(dataFrame.col("time"),"60 seconds"),col("channelName"))
.agg(sum("bytes")/1000000 as "byte_count")如何实现foreach编写器,以使其处理方法在每个水印间隔仅触发一次。也就是说,在前面的示例中,我将获得以下内容
10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)发布于 2018-03-30 13:15:31
要为每个水印间隔触发一次process方法,可以使用ProcessingTime("120 seconds")。如下所示:
val query = ds.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("120 seconds"))
.start()流查询的触发设置定义了流数据处理的时序,即该查询是作为具有固定批处理间隔的微批查询执行还是作为连续处理查询执行。
https://stackoverflow.com/questions/49567007
复制相似问题