首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >每隔5小时找一次最小值

每隔5小时找一次最小值
EN

Stack Overflow用户
提问于 2020-11-08 15:37:14
回答 2查看 225关注 0票数 3

我的df

代码语言:javascript
复制
val df = Seq(
  ("1", 1),
  ("1", 1),
  ("1", 2),
  ("1", 4),
  ("1", 5),
  ("1", 6),
  ("1", 8),
  ("1", 12),
  ("1", 12),
  ("1", 13),
  ("1", 14),
  ("1", 15),
  ("1", 16)
).toDF("id", "time")

在这种情况下,第一个间隔从1小时开始。因此,每一行到6 (1 + 5)都是这个间隔的一部分。

但是8-1> 5,所以第二个间隔从8开始,上升到13。

然后我看到14-8> 5,所以第三个开始,以此类推。

期望的结果

代码语言:javascript
复制
+---+----+--------+
|id |time|min_time|
+---+----+--------+
|1  |1   |1       |
|1  |1   |1       |
|1  |2   |1       |
|1  |4   |1       |
|1  |5   |1       |
|1  |6   |1       |
|1  |8   |8       |
|1  |12  |8       |
|1  |12  |8       |
|1  |13  |8       |
|1  |14  |14      |
|1  |15  |14      |
|1  |16  |14      |
+---+----+--------+

我试着用min函数来完成它,但是不知道如何解释这种情况。

代码语言:javascript
复制
val window = Window.partitionBy($"id").orderBy($"time")
df
.select($"id", $"time")
.withColumn("min_time", when(($"time" - min($"time").over(window)) <= 5, min($"time").over(window)).otherwise($"time"))
.show(false)

我得到了什么

代码语言:javascript
复制
+---+----+--------+
|id |time|min_time|
+---+----+--------+
|1  |1   |1       |
|1  |1   |1       |
|1  |2   |1       |
|1  |4   |1       |
|1  |5   |1       |
|1  |6   |1       |
|1  |8   |8       |
|1  |12  |12      |
|1  |12  |12      |
|1  |13  |13      |
|1  |14  |14      |
|1  |15  |15      |
|1  |16  |16      |
+---+----+--------+
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-11-11 14:33:55

您可以使用在窗户上使用聚合函数的第一个想法。但是,与其使用Spark已经定义的函数的某些组合,您还可以定义自己的Spark用户定义的聚合函数 (UDAF)。

分析

正如您正确地假设的,我们应该在窗口上使用一种min函数。在此窗口的行上,我们希望实现以下规则:

给定按time排序的行,如果上一行的min_time与当前行的time之间的差值大于5,那么当前行的min_time应该是当前行的time,否则当前行的min_time应该是上一行的min_time

但是,使用Spark提供的聚合函数,我们无法访问上一行的min_time。它存在一个lag函数,但是使用该函数,我们只能访问前几行的现有值。由于上一行的min_time尚未出现,我们无法访问它。

因此,我们必须定义自己的聚合函数。

解决方案

定义定制的聚合函数

要定义聚合函数,我们需要创建一个扩展集料器抽象类的类。以下是完整的实施情况:

代码语言:javascript
复制
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

object MinByInterval extends Aggregator[Integer, Integer, Integer] {

  def zero: Integer = null

  def reduce(buffer: Integer, time: Integer): Integer = {
    if (buffer == null || time - buffer > 5) time else buffer
  }

  def merge(b1: Integer, b2: Integer): Integer = {
    throw new NotImplementedError("should not use as general aggregation")
  }

  def finish(reduction: Integer): Integer = reduction

  def bufferEncoder: Encoder[Integer] = Encoders.INT

  def outputEncoder: Encoder[Integer] = Encoders.INT

}

我们使用Integer作为输入、缓冲区和输出类型。我们选择了Integer,因为它是一个可空的Int。我们本可以使用Option[Int],但是Spark的文档建议不要在聚合器方法中重新创建对象以解决性能问题,如果我们使用Option这样的复杂类型会发生什么。

我们实现了reduce方法中分析部分中定义的规则:

代码语言:javascript
复制
def reduce(buffer: Integer, time: Integer): Integer = {
  if (buffer == null || time - buffer > 5) time else buffer
}

这里,time是当前行列时间中的值,buffer是以前计算的值,因此对应于上一行的列min_time。在我们的窗口中,我们按照time对行进行排序,time总是大于buffer。只有在处理第一行时才会出现空缓冲区情况。

当在窗口上使用聚合函数时,不使用merge方法,因此我们不实现它。

finish方法是身份方法,因为我们不需要对聚合值和输出执行最终计算,缓冲区编码器是Encoders.INT

调用用户定义的聚合函数

现在,我们可以使用以下代码调用用户定义的聚合函数:

代码语言:javascript
复制
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}

val minTime = udaf(MinByInterval)
val window = Window.partitionBy("id").orderBy("time")
df.withColumn("min_time", minTime(col("time")).over(window))

给定问题中的输入数据,我们得到:

代码语言:javascript
复制
+---+----+--------+
|id |time|min_time|
+---+----+--------+
|1  |1   |1       |
|1  |1   |1       |
|1  |2   |1       |
|1  |4   |1       |
|1  |5   |1       |
|1  |6   |1       |
|1  |8   |8       |
|1  |12  |8       |
|1  |12  |8       |
|1  |13  |8       |
|1  |14  |14      |
|1  |15  |14      |
|1  |16  |14      |
+---+----+--------+
票数 3
EN

Stack Overflow用户

发布于 2020-11-10 07:17:31

输入数据

代码语言:javascript
复制
val df = Seq(
  ("1", 1),
  ("1", 1),
  ("1", 2),
  ("1", 4),
  ("1", 5),
  ("1", 6),
  ("1", 8),
  ("1", 12),
  ("1", 12),
  ("1", 13),
  ("1", 14),
  ("1", 15),
  ("1", 16),
  ("2", 4),
  ("2", 8),
  ("2", 10),
  ("2", 11),
  ("2", 11),
  ("2", 12),
  ("2", 13),
  ("2", 20)
).toDF("id", "time")

必须对数据进行排序,否则结果将被输入。

代码语言:javascript
复制
val window = Window.partitionBy($"id").orderBy($"time")
df
  .withColumn("min", row_number().over(window))
  .as[Row]
  .map(_.getMin)
  .show(40)

之后,我创建一个case类。var min用于保存最小值,并且只在满足条件时才更新。

代码语言:javascript
复制
case class Row(id:String, time:Int, min:Int){
  def getMin: Row = {
    if(time - Row.min > 5 || Row.min == -99 || min == 1){
      Row.min = time
    }
    Row(id, time, Row.min)
  }
}

object Row{
  var min: Int = -99
}

结果

代码语言:javascript
复制
+---+----+---+
| id|time|min|
+---+----+---+
|  1|   1|  1|
|  1|   1|  1|
|  1|   2|  1|
|  1|   4|  1|
|  1|   5|  1|
|  1|   6|  1|
|  1|   8|  8|
|  1|  12|  8|
|  1|  12|  8|
|  1|  13|  8|
|  1|  14| 14|
|  1|  15| 14|
|  1|  16| 14|
|  2|   4|  4|
|  2|   8|  4|
|  2|  10| 10|
|  2|  11| 10|
|  2|  11| 10|
|  2|  12| 10|
|  2|  13| 10|
|  2|  20| 20|
+---+----+---+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64739902

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档