首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何保存spark结构化流媒体中水印丢弃的记录

如何保存spark结构化流媒体中水印丢弃的记录
EN

Stack Overflow用户
提问于 2020-02-27 00:42:13
回答 2查看 316关注 0票数 2

水印允许自动丢弃Apache Spark结构化流中的旧状态数据。在structured-streaming-programming-guide.md中,字数计数示例演示了水印如何轻松地丢弃系统中较晚到达的记录或事件。( https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md )

Words.withWatermark(“时间戳”,"10分钟“)

有没有办法保存通过在磁盘或表中添加水印而丢失或丢弃的记录?

EN

回答 2

Stack Overflow用户

发布于 2020-03-02 13:07:26

是的,spark没有追踪这些records.But的功能,flink做到了!

票数 1
EN

Stack Overflow用户

发布于 2021-03-10 17:43:32

通过支付一些性能开销,应该可以使用查询侦听器+广播变量和过滤器函数。一些类似的东西:

代码语言:javascript
复制
class WaterMark extends Serializable {

 var ws: Long = 0L;
 def set(value: Long) : Unit = {
   ws = value
 }
def get(): Long = { ws}
}

var currentWs = spark.sparkContext.broadcast[WaterMark](new WaterMark) 
 
 df.filter(row => {
    if(row.get("timestamp") < currentWs.value.ws){
   //this will be filtered by watermark. we can persist it using custom method}
   .........................
  
  //Not filtering the row as that would be done by watermarking 
  true
  })   
..............................

class QueryListener (currentWs: Broadcast[WaterMark]) extends StreamingQueryListener {

import java.util.Locale

val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
format.setTimeZone(TimeZone.getTimeZone("UTC"))
...........................
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {

//un-persist the broadcast var so that it can be updated with next batch watermark
currentWs.unpersist(true)
currentWs.value.set(format.parse(event.progress.eventTime.get("watermark")).getTime)
  println("Listener: " + currentWs.value.ws)

 }
......................
}

备注:我自己还没有端到端地尝试过它,并且它不能处理由于失败或代码更改而重新启动查询时的情况(检查点目录/提交文件夹来抢救??)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60418632

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档