我正面临星火应用程序的问题。下面是我的代码的简化版本:
def main(args: Array[String]) {
// Initializing spark context
val sc = new SparkContext()
val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)
// Getting files from TGZ archives
val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
logger.debug("Getting files from archive : "+tgzStream._1)
utils.getFilesFromTgzStream(tgzStream._2)
})
// We run the same process with 3 different "modes"
val modes = Seq("mode1", "mode2", "mode3")
// We cache the RDD before
val nb = filesRDD.cache().count()
logger.debug($nb + " files as input")
modes.map(mode => {
logger.debug("Processing files with mode : " + mode)
myProcessor.process(mode, filesRDD)
})
filesRDD.unpersist() // I tried with or without this
[...]
}生成的日志(例如,以3个档案作为输入)如下:
从归档中获取文件: 从档案中获取文件:B 从归档中获取文件:C 3个文件作为输入 以模式处理文件: mode1 从归档中获取文件: 从档案中获取文件:B 从归档中获取文件:C 以模式处理文件: mode2 从归档中获取文件: 从档案中获取文件:B 从归档中获取文件:C 以模式处理文件: mode3 从归档中获取文件: 从档案中获取文件:B 从归档中获取文件:C
我的星火配置:
我从这些日志中了解到的是,文件提取是执行4次安装一次!这显然导致了堆空间问题和性能泄露..。
我做错什么了吗?
编辑:我也尝试使用modes.foreach(...)而不是地图,但是没有什么改变.
发布于 2019-02-28 15:22:40
好吧,经过很多测试,我终于解决了这个问题。事实上,有两个问题:
cache或persist函数太大,无法完全存储在总内存的60%中,我就知道这一点,但我认为我的输入数据不太大,但实际上我的RDD是80 my。但是我60%的内存(160 my )仍然超过80 my,那么发生了什么呢?问题n°2的答案..。为了解决这些问题,我不得不使用persist(StorageLevel.MEMORY_SER),它增加了计算时间,但大大减少了内存使用(根据这个基准),并将分区号设置为1000 (根据Spark文档,其中推荐分区为128 of )。
发布于 2019-02-19 14:10:58
您是否尝试过将modes.map结果传递给列表构造函数(即List(modes.map{ /*...*/}))?有时(我不确定什么时候) Scala集合延迟评估映射,所以如果这些映射直到spark删除缓存后才被评估,那么它将不得不重新计算。
https://stackoverflow.com/questions/54692846
复制相似问题