在星火流中,设置StreamingContext以将检查点设置为可靠的数据存储(S3、HDFS、.)是可能的(如果要使用有状态操作,也是强制性的)。(和):
DStream谱系正如所描述的here,要设置需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)的输出数据存储
另一方面,可以通过在每个DataStream上调用checkpoint(timeInterval)来设置它们的沿袭检查点间隔。实际上,建议将沿袭检查点间隔设置为DataStream滑动间隔的5到10倍:
dstream.checkpoint(checkpointInterval)。通常,DStream的5-10滑动间隔的检查点间隔是一个很好的尝试设置。
我的问题是:
当流上下文被设置为执行检查点并且没有称为时,是否为默认checkpointInterval等于batchInterval的所有数据流启用了沿袭检查点?或者,相反,只有元数据检查点启用了什么?
发布于 2016-01-01 16:55:26
检查星火代码(v1.5)我发现DStream的检查点是在两种情况下启用的:
通过显式调用他们的checkpoint StreamContext**'s):**方法(而不是方法)来实现
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
"Cannot change checkpoint interval of an DStream after streaming context has started")
}
persist()
checkpointDuration = interval
this
}只要具体的‘DStream’子类重写了 mustCheckpoint 属性(将其设置为 true**):** ),则初始化时的
private[streaming] def initialize(time: Time) {
...
...
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
...第一种情况很明显。对星火流代码执行天真的分析:
grep "val mustCheckpoint = true" $(find -type f -name "*.scala")
> ./org/apache/spark/streaming/api/python/PythonDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala: override val mustCheckpoint = true我可以发现,通常(忽略PythonDStream),StreamingContext检查点只为StateDStream和ReducedWindowedDStream实例启用沿袭检查点。这些实例是转换(分别和)的结果:
https://stackoverflow.com/questions/34550374
复制相似问题