我的df
val df = Seq(
("1", 1),
("1", 1),
("1", 2),
("1", 4),
("1", 5),
("1", 6),
("1", 8),
("1", 12),
("1", 12),
("1", 13),
("1", 14),
("1", 15),
("1", 16)
).toDF("id", "time")在这种情况下,第一个间隔从1小时开始。因此,每一行到6 (1 + 5)都是这个间隔的一部分。
但是8-1> 5,所以第二个间隔从8开始,上升到13。
然后我看到14-8> 5,所以第三个开始,以此类推。
期望的结果
+---+----+--------+
|id |time|min_time|
+---+----+--------+
|1 |1 |1 |
|1 |1 |1 |
|1 |2 |1 |
|1 |4 |1 |
|1 |5 |1 |
|1 |6 |1 |
|1 |8 |8 |
|1 |12 |8 |
|1 |12 |8 |
|1 |13 |8 |
|1 |14 |14 |
|1 |15 |14 |
|1 |16 |14 |
+---+----+--------+我试着用min函数来完成它,但是不知道如何解释这种情况。
val window = Window.partitionBy($"id").orderBy($"time")
df
.select($"id", $"time")
.withColumn("min_time", when(($"time" - min($"time").over(window)) <= 5, min($"time").over(window)).otherwise($"time"))
.show(false)我得到了什么
+---+----+--------+
|id |time|min_time|
+---+----+--------+
|1 |1 |1 |
|1 |1 |1 |
|1 |2 |1 |
|1 |4 |1 |
|1 |5 |1 |
|1 |6 |1 |
|1 |8 |8 |
|1 |12 |12 |
|1 |12 |12 |
|1 |13 |13 |
|1 |14 |14 |
|1 |15 |15 |
|1 |16 |16 |
+---+----+--------+发布于 2020-11-11 14:33:55
您可以使用在窗户上使用聚合函数的第一个想法。但是,与其使用Spark已经定义的函数的某些组合,您还可以定义自己的Spark用户定义的聚合函数 (UDAF)。
分析
正如您正确地假设的,我们应该在窗口上使用一种min函数。在此窗口的行上,我们希望实现以下规则:
给定按
time排序的行,如果上一行的min_time与当前行的time之间的差值大于5,那么当前行的min_time应该是当前行的time,否则当前行的min_time应该是上一行的min_time。
但是,使用Spark提供的聚合函数,我们无法访问上一行的min_time。它存在一个lag函数,但是使用该函数,我们只能访问前几行的现有值。由于上一行的min_time尚未出现,我们无法访问它。
因此,我们必须定义自己的聚合函数。
解决方案
定义定制的聚合函数
要定义聚合函数,我们需要创建一个扩展集料器抽象类的类。以下是完整的实施情况:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
object MinByInterval extends Aggregator[Integer, Integer, Integer] {
def zero: Integer = null
def reduce(buffer: Integer, time: Integer): Integer = {
if (buffer == null || time - buffer > 5) time else buffer
}
def merge(b1: Integer, b2: Integer): Integer = {
throw new NotImplementedError("should not use as general aggregation")
}
def finish(reduction: Integer): Integer = reduction
def bufferEncoder: Encoder[Integer] = Encoders.INT
def outputEncoder: Encoder[Integer] = Encoders.INT
}我们使用Integer作为输入、缓冲区和输出类型。我们选择了Integer,因为它是一个可空的Int。我们本可以使用Option[Int],但是Spark的文档建议不要在聚合器方法中重新创建对象以解决性能问题,如果我们使用Option这样的复杂类型会发生什么。
我们实现了reduce方法中分析部分中定义的规则:
def reduce(buffer: Integer, time: Integer): Integer = {
if (buffer == null || time - buffer > 5) time else buffer
}这里,time是当前行列时间中的值,buffer是以前计算的值,因此对应于上一行的列min_time。在我们的窗口中,我们按照time对行进行排序,time总是大于buffer。只有在处理第一行时才会出现空缓冲区情况。
当在窗口上使用聚合函数时,不使用merge方法,因此我们不实现它。
finish方法是身份方法,因为我们不需要对聚合值和输出执行最终计算,缓冲区编码器是Encoders.INT
调用用户定义的聚合函数
现在,我们可以使用以下代码调用用户定义的聚合函数:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}
val minTime = udaf(MinByInterval)
val window = Window.partitionBy("id").orderBy("time")
df.withColumn("min_time", minTime(col("time")).over(window))跑
给定问题中的输入数据,我们得到:
+---+----+--------+
|id |time|min_time|
+---+----+--------+
|1 |1 |1 |
|1 |1 |1 |
|1 |2 |1 |
|1 |4 |1 |
|1 |5 |1 |
|1 |6 |1 |
|1 |8 |8 |
|1 |12 |8 |
|1 |12 |8 |
|1 |13 |8 |
|1 |14 |14 |
|1 |15 |14 |
|1 |16 |14 |
+---+----+--------+发布于 2020-11-10 07:17:31
输入数据
val df = Seq(
("1", 1),
("1", 1),
("1", 2),
("1", 4),
("1", 5),
("1", 6),
("1", 8),
("1", 12),
("1", 12),
("1", 13),
("1", 14),
("1", 15),
("1", 16),
("2", 4),
("2", 8),
("2", 10),
("2", 11),
("2", 11),
("2", 12),
("2", 13),
("2", 20)
).toDF("id", "time")必须对数据进行排序,否则结果将被输入。
val window = Window.partitionBy($"id").orderBy($"time")
df
.withColumn("min", row_number().over(window))
.as[Row]
.map(_.getMin)
.show(40)之后,我创建一个case类。var min用于保存最小值,并且只在满足条件时才更新。
case class Row(id:String, time:Int, min:Int){
def getMin: Row = {
if(time - Row.min > 5 || Row.min == -99 || min == 1){
Row.min = time
}
Row(id, time, Row.min)
}
}
object Row{
var min: Int = -99
}结果
+---+----+---+
| id|time|min|
+---+----+---+
| 1| 1| 1|
| 1| 1| 1|
| 1| 2| 1|
| 1| 4| 1|
| 1| 5| 1|
| 1| 6| 1|
| 1| 8| 8|
| 1| 12| 8|
| 1| 12| 8|
| 1| 13| 8|
| 1| 14| 14|
| 1| 15| 14|
| 1| 16| 14|
| 2| 4| 4|
| 2| 8| 4|
| 2| 10| 10|
| 2| 11| 10|
| 2| 11| 10|
| 2| 12| 10|
| 2| 13| 10|
| 2| 20| 20|
+---+----+---+https://stackoverflow.com/questions/64739902
复制相似问题