首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache -如何在3之后定义UserDefinedAggregateFunction?

Apache -如何在3之后定义UserDefinedAggregateFunction?
EN

Stack Overflow用户
提问于 2021-03-25 23:22:24
回答 1查看 960关注 0票数 2

我正在使用Spark3.0,为了使用用户定义的函数作为窗口函数,我需要一个UserDefinedAggregateFunction实例。最初,我认为使用新的Aggregatorudaf可以解决这个问题(如这里所示),但udaf返回的是UserDefinedFunction,而不是UserDefinedAggregateFunction

从Spark3.0开始,UserDefinedAggregateFunction就被废弃了,就像声明的这里 (尽管仍然可以保持到处找它)。

所以问题是:在Spark3.0中是否有一种正确(不反对)的方法来定义适当的UserDefinedAggregateFunction并将其用作窗口函数?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-26 14:54:13

在Spark 3中,新API使用Aggregator定义用户定义的聚合:

抽象类Aggregator[-IN, BUF, OUT]扩展了可序列化的: 用户定义聚合的基类,可在Dataset操作中使用,以获取组的所有元素并将其还原为单个值。

与不推荐的联合发展新议程相比,聚合器带来了性能改进。您可以看到问题有效的用户定义聚合器

下面是一个关于如何定义平均聚合器并使用functions.udaf方法注册它的示例:

代码语言:javascript
复制
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator

val meanAgg= new Aggregator[Long, (Long, Long), Double]() {
    
    def zero = (0L, 0L) // Init the buffer
  
    def reduce(y: (Long, Long), x: Long) = (y._1 + x, y._2 + 1)

    def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2)

    def finish(r: (Long, Long)) = r._1.toDouble / r._2
  
    def bufferEncoder: Encoder[(Long, Long)] = implicitly(ExpressionEncoder[(Long, Long)])

    def outputEncoder: Encoder[Double] = implicitly(ExpressionEncoder[Double])
}

val meanUdaf = udaf(meanAgg)

与窗口一起使用:

代码语言:javascript
复制
val df = Seq(
  (1, 2), (1, 5),
  (2, 3), (2, 1),
).toDF("id", "value")
    
df.withColumn("mean", meanUdaf($"value").over(Window.partitionBy($"id"))).show
//+---+-----+----+
//| id|value|mean|
//+---+-----+----+
//|  1|    2| 3.5|
//|  1|    5| 3.5|
//|  2|    3| 2.0|
//|  2|    1| 2.0|
//+---+-----+----+
票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66808917

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档