我正在阅读Spark的源代码,我在它的shuffle实现中发现,在shuffle读取过程中,当BlockStoreShuffleReader.read被调用时,它将首先使用ExternalAppendOnlyMap来聚合
def combineValuesByKey(
iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}然后,它将使用ExternalSorter进行排序和聚合。所以这里会有很多磁盘溢出/读取工作。
val resultIter = dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
...我的问题是,为什么我们同时需要ExternalSorter和ExternalAppendOnlyMap?有没有可能我们把这两个合并成一个?
我的意思是他们的代码看起来很相似,为什么我们不能用ExternalSorter而不是ExternalAppendOnlyMap呢?因为它既可以聚合又可以排序?
发布于 2020-05-04 18:41:23
免责声明我现在只是在探索星火核心的这一部分,所以我的理解可能是完全错误的。
我的理解是,ExternalAppendOnlyMap只是一个可溢出的跟踪大小的仅追加映射,而ExternalSorter可以是一个缓冲区或映射(基于映射端局部值的映射端组合标志)。
,有没有可能我们把这两个合并成一个?
在这一点上,我认为它们有很多共同之处,而且ExternalSorter似乎更灵活(因为它可以做ExternalAppendOnlyMap做的事情)。
我认为你的问题的答案是“是的”,但很少有人有足够的勇气或足够的勇气来实现这些变化。
https://stackoverflow.com/questions/59242063
复制相似问题