在我的项目中,我在RDD的操作和转换中执行一些副作用。我想测试我的业务逻辑是否工作,即使Spark engine不得不重试某些分区的计算。所以我试着在计算过程中模拟失败。
object Test extends App {
val conf = new SparkConf()
conf.setMaster("local[4]")
conf.setAppName(getClass.getName)
val sc = new SparkContext(conf)
try {
val data = sc.parallelize(1 to 10)
val res = data.map(n => {
if (TaskContext.get().attemptNumber() == 0 && n==5) {
sys.error("boom!")
}
n * 2
}).collect()
}
finally {
sc.stop()
}
}但它不起作用:异常被传播到驱动程序。似乎Spark只尝试故障转移它的内部错误。有什么方法可以测试它吗?
发布于 2017-08-01 18:10:19
我想说的是,你很可能会在你的程序和对spark容错计算机制的理解上犯错误。
首先,请参考scala doc,特别是这个函数
def error(message: String): Nothing
使用提供的消息RuntimeException抛出一个新的。
如果您在程序中插入此代码,而不使用catch exception语句,则调用此函数将导致运行时异常,并将使当前进程终止。
这与spark的容错计算机制无关,只是与它运行的进程和操作系统有关!没有捕获的运行时异常将导致控制流(aka.current进程)终止。
但是,的容错计算机制是什么?
它是关于spark内部,它使用多个复制来保证计算安全,一个特定的spark应用的每个任务都可能崩溃,有很多可能的原因,比如网络io故障。
但是,通过使用一个巧妙的技巧,驱动程序节点维持了rdd的依赖关系。然后,可以使用驱动器节点的谱系从原始数据集重新计算任何分区(aka.task)。
它是关于应用程序(Spark)级别的,而不是os +进程级别的。
https://stackoverflow.com/questions/45433304
复制相似问题