我正在尝试加载存储在S3上的~1M文件集。运行sc.binaryFiles("s3a://BUCKETNAME/*").count()时
我要去接WARN TaskSetManager: Stage 0 contains a task of very large size (177 KB). The maximum recommended task size is 100 KB了。这之后是失败的任务。
我看到它为这个阶段推断了128个分区,这太低了,请注意,当在400 K文件桶上运行相同的命令时,分区的数量会更高(~2K分区),并且操作将成功。
设置更高的minPartitions没有帮助;设置更高的spark.default.parallelism也没有帮助。
唯一起作用的是创建多个包含1000个文件的较小的RDD,并在它们上运行sc.union,这种方法的问题是太慢了。
如何减轻这一问题?
UPDATE:继续查看如何在BinaryFileRDD.getPartitions()中确定分区数,这使我看到了这段代码:
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val files = listStatus(context).asScala
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}我跟踪了计算,但还是没有意义,我应该得到一个更大的数字。
因此,我尝试减少config.FILES_MAX_PARTITION_BYTES配置(spark.files.maxPartitionBytes) --这确实增加了分区的数量,并完成了作业,但是我仍然得到了最初的警告(任务大小稍微小了一点),而且,与在400 K文件集上运行时相比,分区的数量要小得多。
发布于 2017-09-17 11:49:32
问题的根源在于文件的大小:令我惊讶的是,s3中的文件没有正确上传,它们的大小比它们应该的大小小100倍。这导致setMinPartitions计算包含大量小文件的拆分。每个拆分实际上是一个逗号分隔的文件路径字符串,因为每个拆分有许多文件,所以我们得到了一个非常长的指令字符串,应该传递给所有工作人员。这就加重了网络的负担,导致了整个流程的失败。将spark.files.maxPartitionBytes设置为较低的值解决了这个问题。
https://stackoverflow.com/questions/46223130
复制相似问题