首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Spark Structured Streaming中,在水印间隔之后一次写入记录

在Spark Structured Streaming中,在水印间隔之后一次写入记录
EN

Stack Overflow用户
提问于 2018-03-30 08:46:48
回答 1查看 226关注 0票数 1

我有以下疑问:

代码语言:javascript
复制
val ds = dataFrame
  .filter(! $"requri".endsWith(".m3u8"))
  .filter(! $"bserver".contains("trimmer"))
  .withWatermark("time", "120 seconds")
  .groupBy(window(dataFrame.col("time"),"60 seconds"),col("channelName"))
  .agg(sum("bytes")/1000000 as "byte_count")

如何实现foreach编写器,以使其处理方法在每个水印间隔仅触发一次。也就是说,在前面的示例中,我将获得以下内容

代码语言:javascript
复制
10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)
EN

回答 1

Stack Overflow用户

发布于 2018-03-30 13:15:31

要为每个水印间隔触发一次process方法,可以使用ProcessingTime("120 seconds")。如下所示:

代码语言:javascript
复制
val query = ds.writeStream
              .format("console")
              .trigger(Trigger.ProcessingTime("120 seconds"))
              .start()

流查询的触发设置定义了流数据处理的时序,即该查询是作为具有固定批处理间隔的微批查询执行还是作为连续处理查询执行。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49567007

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档