我有一个包含一个terasort的火花工作台,当数据只有几百GB时,它就能正常运行,但是当我生成更多的数据(例如1 TB )时,它在一些step.The中出错了,下面是我的代码:
import org.apache.spark.rdd._
import org.apache.spark._
import org.apache.spark.SparkContext._
object ScalaTeraSort{
def main(args: Array[String]){
if (args.length < 2){
System.err.println(
s"Usage: $ScalaTeraSort <INPUT_HDFS> <OUTPUT_HDFS>"
)
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("ScalaTeraSort")
val sc = new SparkContext(sparkConf)
val file = sc.textFile(args(0))
val data = file.map(line => (line.substring(0, 10), line.substring(10)))
.sortByKey().map{case(k, v) => k + v}
data.saveAsTextFile(args(1))
sc.stop()
}}
该代码主要包括三个步骤: sortByKey、map和saveAsTextFile。前两步似乎没有错,但说到第三步,它总是出错,然后再试第二步。第三步出错是因为"FetchFailed(BlockManagerId(40,sr232,44815,0),shuffleId=0,mapId=11825,reduceId=0)"“
发布于 2014-10-09 06:54:19
我发现了原因,根本的问题是:java.io.IOException: sendMessageReliably失败了,因为在60秒内没有收到ack
也就是说,您必须将属性"spark.core.connection.ack.wait.timeout“设置为更大的值,默认为60秒。否则,舞台就会失败,因为很长时间没有反应。
https://stackoverflow.com/questions/26247654
复制相似问题