我有一个文件夹,里面有14个文件。我运行星星之交,在集群上有10个执行者,其中有资源管理器作为纱线。
我创建了第一个RDD,如下所示:
JavaPairRDD<String,String> files = sc.wholeTextFiles(folderPath.toString(), 10);然而,files.getNumPartitions()随机地给了我7或8。然后,我不会在任何地方使用合并/重新分区,而是使用7-8分区完成我的DAG。
正如我所知,我们作为“最小”分区数进行了论证,那么为什么Spark将我的RDD划分为7-8个分区呢?
我还用20个分区运行了相同的程序,它给了我11个分区。
我在这里看到了一个主题,但是它是关于“更多”分区的,这对我一点帮助都没有。
注意:在程序中,我读取了另一个包含10个文件的文件夹,Spark成功地创建了10个分区。在这个成功的工作完成后,我运行上面的问题转换。
文件大小: 1)25.07 KB 2)46.61 KB 3)126.34 KB 4)158.15 KB 5)169.21 KB 6)16.03 KB 7)67.41 KB 8)60.84 KB 9)70.83 KB 10)87.94 KB 11)99.29 KB 12)120.58 KB 13)170.43 KB 14)183.87 KB
文件位于HDFS上,块大小为128 on,复制因子为3。
发布于 2018-08-01 13:11:22
如果我们有每个文件的大小,那就更清楚了。但代码不会出错。我将按照火花代码库添加这个答案。
wholeTextFiles中传递。
def setMinPartitions(上下文: JobContext,minPartitions: Int) { val files =listStatus(上下文).asScala val totalLen =files.map(文件=> if (file.isDirectory) 0L files file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 files) // file:}//file:
链接maxSplitSize划分(Spark中的分区)将从源中提取。
inputFormat.setMinPartitions(jobContext,minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray //这里的拆分数将决定val结果=新的数组分区 (i <- 0,直到rawSplits.size) { result(i) =新NewHadoopPartition(id,i,rawSplits(i).asInstanceOfInputSplit with Writable) } // file: WholeTextFileRDD.scala
链接CombineFileInputFormat#getSplits类中有关读取文件和准备拆分的更多信息。
注意: 我将火花分区称为MapReduce拆分,因为MapReduce借用了MapReduce的输入和输出格式化程序
https://stackoverflow.com/questions/51628875
复制相似问题