我有一个“容量”数据:
scala> sql("create table capacity (id String, capacity Int)");
scala> sql("insert into capacity values ('A', 50), ('B', 100)");
scala> sql("select * from capacity").show(false)
+---+--------+
|id |capacity|
+---+--------+
|A |50 |
|B |100 |
+---+--------+我有另一个“已使用”的数据,有以下信息:
scala> sql ("create table used (id String, capacityId String, used Int)");
scala> sql ("insert into used values ('item1', 'A', 10), ('item2', 'A', 20), ('item3', 'A', 10), ('item4', 'B', 30), ('item5', 'B', 40), ('item6', 'B', 40)")
scala> sql("select * from used order by capacityId").show(false)
+-----+----------+----+
|id |capacityId|used|
+-----+----------+----+
|item1|A |10 |
|item3|A |10 |
|item2|A |20 |
|item6|B |40 |
|item4|B |30 |
|item5|B |40 |
+-----+----------+----+"capacityId“列的”使用“数据是外键到列"id”的“容量”数据。我想计算"capacityLeft“一栏,它是那个时间点的剩余量。
+-----+----------+----+--------------+
|id |capacityId|used| capacityLeft |
+-----+----------+----+--------------+
|item1|A |10 |40 | <- 50(capacity of 'A')-10
|item3|A |10 |30 | <- 40-10
|item2|A |20 |10 | <- 30-20
|item6|B |40 |60 | <- 100(capacity of 'B')-40
|item4|B |30 |30 | <- 60-30
|item5|B |40 |-10 | <- 30-40
+-----+----------+----+--------------+在实际意义上,"createdDate“列用于对”已使用“数据列进行排序。
火花版本: 2.2
发布于 2018-11-20 09:29:12
这可以通过在星火中使用窗口函数来解决。请注意,要使其工作,需要有一个列来跟踪每个capacityId的行顺序。
首先,将两个数据文件连接在一起:
val df = used.join(capacity.withColumnRenamed("id", "capacityId"), Seq("capacityId"), "inner")在这里,重命名capacity dataframe中的id以匹配used dataframe中的id名称,以避免保留重复的列。
现在创建一个窗口并计算使用的列的累积和。取capacity的值,减去累积和,得到剩余的金额:
val w = Window.partitionBy("capacityId").orderBy("createdDate")
val df2 = df.withColumn("capacityLeft", $"capacity" - sum($"used").over(w))使用示例createdDate列生成数据帧:
+----------+-----+----+-----------+--------+------------+
|capacityId| id|used|createdDate|capacity|capacityLeft|
+----------+-----+----+-----------+--------+------------+
| B|item6| 40| 1| 100| 60|
| B|item4| 30| 2| 100| 30|
| B|item5| 40| 3| 100| -10|
| A|item1| 10| 1| 50| 40|
| A|item3| 10| 2| 50| 30|
| A|item2| 20| 3| 50| 10|
+----------+-----+----+-----------+--------+------------+任何不需要的列现在都可以用drop删除。
https://stackoverflow.com/questions/53389165
复制相似问题