首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在不使用udaf的情况下创建映射列来计算出现次数

如何在不使用udaf的情况下创建映射列来计算出现次数
EN

Stack Overflow用户
提问于 2020-10-14 00:24:07
回答 4查看 483关注 0票数 3

我想创建一个Map列来统计出现的次数。

例如:

代码语言:javascript
复制
+---+----+
|  b|   a|
+---+----+
|  1|   b|
|  2|null|
|  1|   a|
|  1|   a|
+---+----+

会导致

代码语言:javascript
复制
+---+--------------------+
|  b|                 res|
+---+--------------------+
|  1|[a -> 2.0, b -> 1.0]|
|  2|                  []|
+---+--------------------+

目前,在Spark 2.4.6中,我能够使用udaf实现它。

当我碰到Spark3时,我想知道我是否可以摆脱这个udaf (我尝试使用新方法aggregate,但没有成功)。

有没有一种有效的方法来做到这一点?(对于效率部分,我可以很容易地进行测试)

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2020-10-14 03:23:20

下面是Spark 3的解决方案:

代码语言:javascript
复制
import org.apache.spark.sql.functions._

df.groupBy($"b",$"a").count()
  .groupBy($"b")
  .agg(
    map_from_entries(
      collect_list(
        when($"a".isNotNull,struct($"a",$"count"))
      )
    ).as("res")
  )
  .show()

提供:

代码语言:javascript
复制
+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

这里是使用Aggregator的解决方案

代码语言:javascript
复制
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder

val countOcc = new Aggregator[String, Map[String,Int], Map[String,Int]] with Serializable {
    def zero: Map[String,Int] = Map.empty.withDefaultValue(0)
    def reduce(b: Map[String,Int], a: String) = if(a!=null) b + (a -> (b(a) + 1)) else b
    def merge(b1: Map[String,Int], b2: Map[String,Int]) = {
      val keys = b1.keys.toSet.union(b2.keys.toSet)
      keys.map{ k => (k -> (b1(k) + b2(k))) }.toMap
    }
    def finish(b: Map[String,Int]) = b
    def bufferEncoder: Encoder[Map[String,Int]] = implicitly(ExpressionEncoder[Map[String,Int]])
    def outputEncoder: Encoder[Map[String, Int]] = implicitly(ExpressionEncoder[Map[String, Int]])
}

val countOccUDAF = udaf(countOcc)

df
  .groupBy($"b")
  .agg(countOccUDAF($"a").as("res"))
  .show()

提供:

代码语言:javascript
复制
+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+
票数 3
EN

Stack Overflow用户

发布于 2020-10-14 03:30:06

您可以始终将collect_list与UDF一起使用,但前提是您的分组不是太大:

代码语言:javascript
复制
val udf_histo = udf((x:Seq[String]) => x.groupBy(identity).mapValues(_.size))

df.groupBy($"b")
  .agg(
    collect_list($"a").as("as")
  )
  .select($"b",udf_histo($"as").as("res"))
  .show()

提供:

代码语言:javascript
复制
+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

这应该比UDAF更快:Spark custom aggregation : collect

票数 2
EN

Stack Overflow用户

发布于 2020-10-14 01:24:17

我们可以实现这是spark 2.4

代码语言:javascript
复制
//GET THE COUNTS
val groupedCountDf = originalDf.groupBy("b","a").count

//CREATE MAPS FOR EVERY COUNT | EMPTY MAP FOR NULL KEY
//AGGREGATE THEM AS ARRAY 

val dfWithArrayOfMaps =  groupedCountDf
.withColumn("newMap",  when($"a".isNotNull, map($"a",$"count")).otherwise(map()))
.groupBy("b").agg(collect_list($"newMap") as "multimap")

//EXPRESSION TO CONVERT ARRAY[MAP] -> MAP

val mapConcatExpr = expr("aggregate(multimap, map(), (k, v) -> map_concat(k, v))")

val finalDf = dfWithArrayOfMaps.select($"b", mapConcatExpr.as("merged_data"))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64339279

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档