首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花缓存的RDD是计算n次。

火花缓存的RDD是计算n次。
EN

Stack Overflow用户
提问于 2019-02-14 14:33:48
回答 2查看 181关注 0票数 5

我正面临星火应用程序的问题。下面是我的代码的简化版本:

代码语言:javascript
复制
def main(args: Array[String]) {
    // Initializing spark context
    val sc = new SparkContext()
    val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
    System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)

    // Getting files from TGZ archives
    val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
    val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
        logger.debug("Getting files from archive : "+tgzStream._1)
        utils.getFilesFromTgzStream(tgzStream._2)
    })

    // We run the same process with 3 different "modes"
    val modes = Seq("mode1", "mode2", "mode3")

    // We cache the RDD before
    val nb = filesRDD.cache().count()
    logger.debug($nb + " files as input")

    modes.map(mode => {
        logger.debug("Processing files with mode : " + mode)
        myProcessor.process(mode, filesRDD)
    })

    filesRDD.unpersist() // I tried with or without this

    [...]
}

生成的日志(例如,以3个档案作为输入)如下:

从归档中获取文件: 从档案中获取文件:B 从归档中获取文件:C 3个文件作为输入 以模式处理文件: mode1 从归档中获取文件: 从档案中获取文件:B 从归档中获取文件:C 以模式处理文件: mode2 从归档中获取文件: 从档案中获取文件:B 从归档中获取文件:C 以模式处理文件: mode3 从归档中获取文件: 从档案中获取文件:B 从归档中获取文件:C

我的星火配置:

  • 版本: 1.6.2
  • 执行器: 20 x 2 2CPU x8 8Go
  • 每个执行器的纱线开销存储器:800 per
  • 驱动程序:1 1CPU x8 8Go

我从这些日志中了解到的是,文件提取是执行4次安装一次!这显然导致了堆空间问题和性能泄露..。

我做错什么了吗?

编辑:我也尝试使用modes.foreach(...)而不是地图,但是没有什么改变.

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-02-28 15:22:40

好吧,经过很多测试,我终于解决了这个问题。事实上,有两个问题:

  1. 我低估了输入数据的大小:如果Spark的cachepersist函数太大,无法完全存储在总内存的60%中,我就知道这一点,但我认为我的输入数据不太大,但实际上我的RDD是80 my。但是我60%的内存(160 my )仍然超过80 my,那么发生了什么呢?问题n°2的答案..。
  2. 我的分区太大了:在我的代码中的某个地方,的分区数被设置为100个,所以我有100个分区,每个分区1.6GB。问题是,我的数据是由几十个Megs组成的字符串组成的,所以我的分区没有满,而10 8GB的使用内存实际上只包含7或8GB的实际数据。

为了解决这些问题,我不得不使用persist(StorageLevel.MEMORY_SER),它增加了计算时间,但大大减少了内存使用(根据这个基准),并将分区号设置为1000 (根据Spark文档,其中推荐分区为128 of )。

票数 0
EN

Stack Overflow用户

发布于 2019-02-19 14:10:58

您是否尝试过将modes.map结果传递给列表构造函数(即List(modes.map{ /*...*/}))?有时(我不确定什么时候) Scala集合延迟评估映射,所以如果这些映射直到spark删除缓存后才被评估,那么它将不得不重新计算。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54692846

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档