首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark创建的分区比minPartitions在WholeTextFiles上的参数少

Spark创建的分区比minPartitions在WholeTextFiles上的参数少
EN

Stack Overflow用户
提问于 2018-08-01 08:39:57
回答 1查看 659关注 0票数 2

我有一个文件夹,里面有14个文件。我运行星星之交,在集群上有10个执行者,其中有资源管理器作为纱线。

我创建了第一个RDD,如下所示:

代码语言:javascript
复制
JavaPairRDD<String,String> files = sc.wholeTextFiles(folderPath.toString(), 10);

然而,files.getNumPartitions()随机地给了我7或8。然后,我不会在任何地方使用合并/重新分区,而是使用7-8分区完成我的DAG。

正如我所知,我们作为“最小”分区数进行了论证,那么为什么Spark将我的RDD划分为7-8个分区呢?

我还用20个分区运行了相同的程序,它给了我11个分区。

我在这里看到了一个主题,但是它是关于“更多”分区的,这对我一点帮助都没有。

注意:在程序中,我读取了另一个包含10个文件的文件夹,Spark成功地创建了10个分区。在这个成功的工作完成后,我运行上面的问题转换。

文件大小: 1)25.07 KB 2)46.61 KB 3)126.34 KB 4)158.15 KB 5)169.21 KB 6)16.03 KB 7)67.41 KB 8)60.84 KB 9)70.83 KB 10)87.94 KB 11)99.29 KB 12)120.58 KB 13)170.43 KB 14)183.87 KB

文件位于HDFS上,块大小为128 on,复制因子为3。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-08-01 13:11:22

如果我们有每个文件的大小,那就更清楚了。但代码不会出错。我将按照火花代码库添加这个答案。

  • 首先,maxSplitSize将根据目录大小min分区wholeTextFiles中传递。 def setMinPartitions(上下文: JobContext,minPartitions: Int) { val files =listStatus(上下文).asScala val totalLen =files.map(文件=> if (file.isDirectory) 0L files file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 files) // file:}//file: 链接
  • 根据maxSplitSize划分(Spark中的分区)将从源中提取。 inputFormat.setMinPartitions(jobContext,minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray //这里的拆分数将决定val结果=新的数组分区 (i <- 0,直到rawSplits.size) { result(i) =新NewHadoopPartition(id,i,rawSplits(i).asInstanceOfInputSplit with Writable) } // file: WholeTextFileRDD.scala 链接

CombineFileInputFormat#getSplits类中有关读取文件和准备拆分的更多信息。

注意: 我将火花分区称为MapReduce拆分,因为MapReduce借用了MapReduce的输入和输出格式化程序

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51628875

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档