首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星星之火:当运行sc.binaryFiles()时,阶段X包含一个非常大的任务

星星之火:当运行sc.binaryFiles()时,阶段X包含一个非常大的任务
EN

Stack Overflow用户
提问于 2017-09-14 15:33:04
回答 1查看 528关注 0票数 0

我正在尝试加载存储在S3上的~1M文件集。运行sc.binaryFiles("s3a://BUCKETNAME/*").count()

我要去接WARN TaskSetManager: Stage 0 contains a task of very large size (177 KB). The maximum recommended task size is 100 KB了。这之后是失败的任务。

我看到它为这个阶段推断了128个分区,这太低了,请注意,当在400 K文件桶上运行相同的命令时,分区的数量会更高(~2K分区),并且操作将成功。

设置更高的minPartitions没有帮助;设置更高的spark.default.parallelism也没有帮助。

唯一起作用的是创建多个包含1000个文件的较小的RDD,并在它们上运行sc.union,这种方法的问题是太慢了。

如何减轻这一问题?

UPDATE:继续查看如何在BinaryFileRDD.getPartitions()中确定分区数,这使我看到了这段代码:

代码语言:javascript
复制
  def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
    val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
    val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
    val defaultParallelism = sc.defaultParallelism
    val files = listStatus(context).asScala
    val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
    val bytesPerCore = totalBytes / defaultParallelism
    val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    super.setMaxSplitSize(maxSplitSize)
  }

我跟踪了计算,但还是没有意义,我应该得到一个更大的数字。

因此,我尝试减少config.FILES_MAX_PARTITION_BYTES配置(spark.files.maxPartitionBytes) --这确实增加了分区的数量,并完成了作业,但是我仍然得到了最初的警告(任务大小稍微小了一点),而且,与在400 K文件集上运行时相比,分区的数量要小得多。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-17 11:49:32

问题的根源在于文件的大小:令我惊讶的是,s3中的文件没有正确上传,它们的大小比它们应该的大小小100倍。这导致setMinPartitions计算包含大量小文件的拆分。每个拆分实际上是一个逗号分隔的文件路径字符串,因为每个拆分有许多文件,所以我们得到了一个非常长的指令字符串,应该传递给所有工作人员。这就加重了网络的负担,导致了整个流程的失败。将spark.files.maxPartitionBytes设置为较低的值解决了这个问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46223130

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档