我使用的是spark2.4.4。我知道df.repartition(n,col1)是基于col1的散列分区的。那么为什么它在数值列上失败了?
val df1 = spark.range(0,10,1).map( e => (e%2, e)).toDF("key", "value")
df1.repartition(2, $"key").foreachPartition{ it => println(it.toList)}
//List()
//List([0,0], [1,1], [0,2], [1,3], [0,4], [1,5], [0,6], [1,7], [0,8], [1,9])但是当我使用字符串列时,它看起来没问题。
val df1 = spark.range(0,10,1).map( e => ((e%2).toString, e)).toDF("key", "value")
df1.repartition(2, $"key").foreachPartition{ it => println(it.toList)}
//List([1,1], [1,3], [1,5], [1,7], [1,9])
//List([0,0], [0,2], [0,4], [0,6], [0,8])发布于 2019-11-16 15:35:50
...为什么它在数值列上失败?
它不是失败的。如果你看一下这个计划:
== Physical Plan ==
Exchange hashpartitioning(key#9L, 2)
+- *(1) Project [_1#6L AS key#9L, _2#7L AS value#10L]
+- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#6L, assertnotnull(input[0, scala.Tuple2, true])._2.longValue AS _2#7L]
+- *(1) MapElements <function1>, obj#5: scala.Tuple2
+- *(1) DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false), obj#4: java.lang.Long
+- *(1) Range (0, 10, step=1, splits=1)您可以看到Spark使用了hashpartitioning。
这意味着对于整数,0和1的散列%2返回相同的结果,而字符串"0"和"1"的散列返回不同的值。尝试将分区重新划分为3个分区,而不是2个分区,并查看差异。
发布于 2019-11-17 02:27:32
您可能会感到困惑,因为0 % 2 == 0和1 % 2 == 1但是Spark SQL使用Murmur3来散列值,包括整数。0和1 Murmur3散列被映射到相同的取模2的值。
注意:这可能很麻烦,但当有很少的键时,Spark根本不会尝试平衡分区,这都取决于散列函数。N°key/n°分区越多,期望的平衡就越令人满意。
https://stackoverflow.com/questions/58887222
复制相似问题