我有一个键-值对RDD。RDD包含一些具有重复键的元素,我想将原始RDD拆分为两个RDD:一个存储具有唯一键的元素,另一个存储其余元素。例如,
输入RDD (共6个元素):
<k1,v1>, <k1,v2>, <k1,v3>, <k2,v4>, <k2,v5>, <k3,v6>结果:
唯一键RDD (存储具有唯一键的元素;对于具有相同键的多个元素,可以接受任何元素):
<k1,v1>, <k2, v4>, <k3,v6>重复密钥RDD (存储带有重复密钥的rest元素):
<k1,v2>, <k1,v3>, <k2,v5>在上面的例子中,唯一的RDD有3个元素,复制的RDD也有3个元素。
我尝试使用groupByKey()将具有相同键的元素分组在一起。对于每个键,都有一个元素序列。然而,groupByKey()的性能并不好,因为元素值的数据量非常大,这导致了很大的随机写入数据量。
所以我想知道是否有更好的解决方案。或者,在使用groupByKey()时,有没有办法减少被混洗的数据量?
发布于 2017-11-19 15:10:51
编辑:给定编辑中的新信息,我将首先创建唯一的rdd,然后使用唯一的和原始的创建复制的rdd:
val inputRdd: RDD[(K,V)] = ...
val uniqueRdd: RDD[(K,V)] = inputRdd.reduceByKey((x,y) => x) //keep just a single value for each key
val duplicateRdd = inputRdd
.join(uniqueRdd)
.filter {case(k, (v1,v2)) => v1 != v2}
.map {case(k,(v1,v2)) => (k, v1)} //v2 came from unique rdd还有一些优化的空间。
在上面的解决方案中,将有两次混洗(reduceByKey和join)。
如果我们从一开始就按键重新划分inputRdd,我们将不需要任何额外的混洗,使用此代码将产生更好的性能:
val inputRdd2 = inputRdd.partitionBy(new HashPartitioner(partitions=200) )原始解决方案:
您可以尝试以下方法:
首先计算每对的出现次数,然后拆分成两个rdd。
val inputRdd: RDD[(K,V)] = ...
val countRdd: RDD[((K,V), Int)] = inputRDD
.map((_, 1))
.reduceByKey(_ + _)
.cache
val uniqueRdd = countRdd.map(_._1)
val duplicateRdd = countRdd
.filter(_._2>1)
.flatMap { case(kv, count) =>
(1 to count-1).map(_ => kv)
}发布于 2017-11-22 18:22:29
请使用combineByKey,从而在Map Task上使用组合器,从而减少数据的混洗。
组合器逻辑取决于您的业务逻辑。
http://bytepadding.com/big-data/spark/groupby-vs-reducebykey/
There are multiple ways to reduce shuffle data.
1. Write less from Map task by use of combiner.
2. Send Aggregated serialized objects from Map to reduce.
3. Use combineInputFormts to enhance efficiency of combiners. https://stackoverflow.com/questions/47373464
复制相似问题