我正在使用Spark3.0,为了使用用户定义的函数作为窗口函数,我需要一个UserDefinedAggregateFunction实例。最初,我认为使用新的Aggregator和udaf可以解决这个问题(如这里所示),但udaf返回的是UserDefinedFunction,而不是UserDefinedAggregateFunction。
从Spark3.0开始,UserDefinedAggregateFunction就被废弃了,就像声明的这里 (尽管仍然可以保持到处找它)。
所以问题是:在Spark3.0中是否有一种正确(不反对)的方法来定义适当的UserDefinedAggregateFunction并将其用作窗口函数?
发布于 2021-03-26 14:54:13
在Spark 3中,新API使用Aggregator定义用户定义的聚合:
抽象类
Aggregator[-IN, BUF, OUT]扩展了可序列化的: 用户定义聚合的基类,可在Dataset操作中使用,以获取组的所有元素并将其还原为单个值。
与不推荐的联合发展新议程相比,聚合器带来了性能改进。您可以看到问题有效的用户定义聚合器。
下面是一个关于如何定义平均聚合器并使用functions.udaf方法注册它的示例:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
val meanAgg= new Aggregator[Long, (Long, Long), Double]() {
def zero = (0L, 0L) // Init the buffer
def reduce(y: (Long, Long), x: Long) = (y._1 + x, y._2 + 1)
def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2)
def finish(r: (Long, Long)) = r._1.toDouble / r._2
def bufferEncoder: Encoder[(Long, Long)] = implicitly(ExpressionEncoder[(Long, Long)])
def outputEncoder: Encoder[Double] = implicitly(ExpressionEncoder[Double])
}
val meanUdaf = udaf(meanAgg)与窗口一起使用:
val df = Seq(
(1, 2), (1, 5),
(2, 3), (2, 1),
).toDF("id", "value")
df.withColumn("mean", meanUdaf($"value").over(Window.partitionBy($"id"))).show
//+---+-----+----+
//| id|value|mean|
//+---+-----+----+
//| 1| 2| 3.5|
//| 1| 5| 3.5|
//| 2| 3| 2.0|
//| 2| 1| 2.0|
//+---+-----+----+https://stackoverflow.com/questions/66808917
复制相似问题