我想创建一个Map列来统计出现的次数。
例如:
+---+----+
| b| a|
+---+----+
| 1| b|
| 2|null|
| 1| a|
| 1| a|
+---+----+会导致
+---+--------------------+
| b| res|
+---+--------------------+
| 1|[a -> 2.0, b -> 1.0]|
| 2| []|
+---+--------------------+目前,在Spark 2.4.6中,我能够使用udaf实现它。
当我碰到Spark3时,我想知道我是否可以摆脱这个udaf (我尝试使用新方法aggregate,但没有成功)。
有没有一种有效的方法来做到这一点?(对于效率部分,我可以很容易地进行测试)
发布于 2020-10-14 03:23:20
下面是Spark 3的解决方案:
import org.apache.spark.sql.functions._
df.groupBy($"b",$"a").count()
.groupBy($"b")
.agg(
map_from_entries(
collect_list(
when($"a".isNotNull,struct($"a",$"count"))
)
).as("res")
)
.show()提供:
+---+----------------+
| b| res|
+---+----------------+
| 1|[b -> 1, a -> 2]|
| 2| []|
+---+----------------+这里是使用Aggregator的解决方案
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder
val countOcc = new Aggregator[String, Map[String,Int], Map[String,Int]] with Serializable {
def zero: Map[String,Int] = Map.empty.withDefaultValue(0)
def reduce(b: Map[String,Int], a: String) = if(a!=null) b + (a -> (b(a) + 1)) else b
def merge(b1: Map[String,Int], b2: Map[String,Int]) = {
val keys = b1.keys.toSet.union(b2.keys.toSet)
keys.map{ k => (k -> (b1(k) + b2(k))) }.toMap
}
def finish(b: Map[String,Int]) = b
def bufferEncoder: Encoder[Map[String,Int]] = implicitly(ExpressionEncoder[Map[String,Int]])
def outputEncoder: Encoder[Map[String, Int]] = implicitly(ExpressionEncoder[Map[String, Int]])
}
val countOccUDAF = udaf(countOcc)
df
.groupBy($"b")
.agg(countOccUDAF($"a").as("res"))
.show()提供:
+---+----------------+
| b| res|
+---+----------------+
| 1|[b -> 1, a -> 2]|
| 2| []|
+---+----------------+发布于 2020-10-14 03:30:06
您可以始终将collect_list与UDF一起使用,但前提是您的分组不是太大:
val udf_histo = udf((x:Seq[String]) => x.groupBy(identity).mapValues(_.size))
df.groupBy($"b")
.agg(
collect_list($"a").as("as")
)
.select($"b",udf_histo($"as").as("res"))
.show()提供:
+---+----------------+
| b| res|
+---+----------------+
| 1|[b -> 1, a -> 2]|
| 2| []|
+---+----------------+这应该比UDAF更快:Spark custom aggregation : collect
发布于 2020-10-14 01:24:17
我们可以实现这是spark 2.4
//GET THE COUNTS
val groupedCountDf = originalDf.groupBy("b","a").count
//CREATE MAPS FOR EVERY COUNT | EMPTY MAP FOR NULL KEY
//AGGREGATE THEM AS ARRAY
val dfWithArrayOfMaps = groupedCountDf
.withColumn("newMap", when($"a".isNotNull, map($"a",$"count")).otherwise(map()))
.groupBy("b").agg(collect_list($"newMap") as "multimap")
//EXPRESSION TO CONVERT ARRAY[MAP] -> MAP
val mapConcatExpr = expr("aggregate(multimap, map(), (k, v) -> map_concat(k, v))")
val finalDf = dfWithArrayOfMaps.select($"b", mapConcatExpr.as("merged_data"))https://stackoverflow.com/questions/64339279
复制相似问题