首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用于DStreams的火花流检查点

用于DStreams的火花流检查点
EN

Stack Overflow用户
提问于 2015-12-31 18:33:19
回答 1查看 8.8K关注 0票数 13

在星火流中,设置StreamingContext以将检查点设置为可靠的数据存储(S3、HDFS、.)是可能的(如果要使用有状态操作,也是强制性的)。(和):

  • 元数据
  • DStream谱系

正如所描述的here,要设置需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)的输出数据存储

另一方面,可以通过在每个DataStream上调用checkpoint(timeInterval)来设置它们的沿袭检查点间隔。实际上,建议将沿袭检查点间隔设置为DataStream滑动间隔的5到10倍:

dstream.checkpoint(checkpointInterval)。通常,DStream的5-10滑动间隔的检查点间隔是一个很好的尝试设置。

我的问题是:

当流上下文被设置为执行检查点并且没有称为时,是否为默认checkpointInterval等于batchInterval的所有数据流启用了沿袭检查点?或者,相反,只有元数据检查点启用了什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-01-01 16:55:26

检查星火代码(v1.5)我发现DStream的检查点是在两种情况下启用的:

通过显式调用他们的checkpoint StreamContext**'s):**方法(而不是方法)来实现

代码语言:javascript
复制
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
        throw new UnsupportedOperationException(
            "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
}

只要具体的‘DStream’子类重写了 mustCheckpoint 属性(将其设置为 true**):** ),则初始化时的

代码语言:javascript
复制
 private[streaming] def initialize(time: Time) {
  ...
  ...   
   // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
   if (mustCheckpoint && checkpointDuration == null) {
     checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
     logInfo("Checkpoint interval automatically set to " + checkpointDuration)
   }
  ...

第一种情况很明显。对星火流代码执行天真的分析:

代码语言:javascript
复制
grep "val mustCheckpoint = true" $(find -type f -name "*.scala")

> ./org/apache/spark/streaming/api/python/PythonDStream.scala:  override     val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala:  override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala:  override val mustCheckpoint = true

我可以发现,通常(忽略PythonDStream),StreamingContext检查点只为StateDStreamReducedWindowedDStream实例启用沿袭检查点。这些实例是转换(分别和)的结果:

  • updateStateByKey:,即通过几个窗口提供状态的流。
  • reduceByKeyAndWindow
票数 11
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34550374

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档