来自关系数据库的数据被加载到火花中--据说每天都有,但实际上并不是每天都有。此外,它是DB增量加载的完整副本。
为了方便地将维度表与主事件数据连接起来,我想:
_to/valid_from列,所以即使数据不能每天(不一致地)使用,仍然可以很好地(从下游)使用。
我正在使用spark 3.0.1,并希望使用SCD2样式来转换现有的数据-而不会丢失历史记录。
spark-shell
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
case class Foo (key:Int, value:Int, date:String)
val d = Seq(Foo(1, 1, "20200101"), Foo(1, 8, "20200102"), Foo(1, 9, "20200120"),Foo(1, 9, "20200121"),Foo(1, 9, "20200122"), Foo(1, 1, "20200103"), Foo(2, 5, "20200101"), Foo(1, 10, "20200113")).toDF
d.show
val windowDeduplication = Window.partitionBy("key", "value").orderBy("key", "date")
val windowPrimaryKey = Window.partitionBy("key").orderBy("key", "date")
val nextThing = lead("date", 1).over(windowPrimaryKey)
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).filter(col("rank") === 1).drop("rank").withColumn("valid_to", nextThing).withColumn("valid_to", when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show在以下方面的成果:
+---+-----+----------+----------+
|key|value|valid_from| valid_to|
+---+-----+----------+----------+
| 1| 1|2020-01-01|2020-01-01|
| 1| 8|2020-01-02|2020-01-12|
| 1| 10|2020-01-13|2020-01-19|
| 1| 9|2020-01-20|2020-10-09|
| 2| 5|2020-01-01|2020-10-09|
+---+-----+----------+----------+这已经相当不错了。然而:
| 1| 1|2020-01-03| 2|2020-01-12|迷失了。也就是说,以后(在中介更改之后)再次发生的任何值都会丢失。如何保留这一行,而不保留更大的级别,例如:
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).withColumn("rank", rank().over(windowDeduplication)).withColumn("valid_to", nextThing).withColumn("valid_to",
when(nextThing.isNotNull, date_sub(nextThing, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").orderBy("key", "valid_from", "valid_to").show
+---+-----+----------+----+----------+
|key|value|valid_from|rank| valid_to|
+---+-----+----------+----+----------+
| 1| 1|2020-01-01| 1|2020-01-01|
| 1| 8|2020-01-02| 1|2020-01-02|
| 1| 1|2020-01-03| 2|2020-01-12|
| 1| 10|2020-01-13| 1|2020-01-19|
| 1| 9|2020-01-20| 1|2020-01-20|
| 1| 9|2020-01-21| 2|2020-01-21|
| 1| 9|2020-01-22| 3|2020-10-09|
| 2| 5|2020-01-01| 1|2020-10-09|
+---+-----+----------+----+----------+这绝对不是我们想要的
删除duplicates
对数据进行任何历史性更改
如何正确地将其转换为SCD2表示,即有一个valid_from,valid_to,但不删除中间状态?
注意:我不需要更新现有的数据(合并到,加入)。重新创建/覆盖它是可以的。
也就是说,Implement SCD Type 2 in Spark似乎太复杂了。在我的情况下,是否有更好的方法不需要状态处理?也就是说,我有来自一个数据库的每日完整副本的数据,并想要去复制它。
发布于 2020-10-09 19:24:54
前一种方法只保留副本的第一个(最早)版本。我认为,对于状态处理,没有联接的唯一解决方案是使用一个窗口函数,在该函数中,每个值将与前一行进行比较--如果整个行中没有任何更改,则将其丢弃。
可能效率较低,但更准确。但这也取决于手头的用例,即更改后的值再次出现的可能性。
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
case class Foo (key:Int, value:Int, value2:Int, date:String)
val d = Seq(Foo(1, 1,1, "20200101"), Foo(1, 8,1, "20200102"), Foo(1, 9,1, "20200120"),Foo(1, 6,1, "20200121"),Foo(1, 9,1, "20200122"), Foo(1, 1,1, "20200103"), Foo(2, 5,1, "20200101"), Foo(1, 10,1, "20200113"), Foo(1, 9,1, "20210120"),Foo(1, 9,1, "20220121"),Foo(1, 9,3, "20230122")).toDF
def compare2Rows(key:Seq[String], sortChangingIgnored:Seq[String], timeColumn:String)(df:DataFrame):DataFrame = {
val windowPrimaryKey = Window.partitionBy(key.map(col):_*).orderBy(sortChangingIgnored.map(col):_*)
val columnsToCompare = df.drop(key ++ sortChangingIgnored:_*).columns
val nextDataChange = lead(timeColumn, 1).over(windowPrimaryKey)
val deduplicated = df.withColumn("data_changes", columnsToCompare.map(e=> col(e) =!= lead(col(e), 1).over(windowPrimaryKey)).reduce(_ or _)).filter(col("data_changes").isNull or col("data_changes"))
deduplicated.withColumn("valid_to", when(nextDataChange.isNotNull, date_sub(nextDataChange, 1)).otherwise(current_date)).withColumnRenamed("date", "valid_from").drop("data_changes")
}
d.orderBy("key", "date").show
d.withColumn("date", to_date(col("date"), "yyyyMMdd")).transform(compare2Rows(Seq("key"), Seq("date"), "date")).orderBy("key", "valid_from", "valid_to").show返回:
+---+-----+------+----------+----------+
|key|value|value2|valid_from| valid_to|
+---+-----+------+----------+----------+
| 1| 1| 1|2020-01-01|2020-01-01|
| 1| 8| 1|2020-01-02|2020-01-02|
| 1| 1| 1|2020-01-03|2020-01-12|
| 1| 10| 1|2020-01-13|2020-01-19|
| 1| 9| 1|2020-01-20|2020-01-20|
| 1| 6| 1|2020-01-21|2022-01-20|
| 1| 9| 1|2022-01-21|2023-01-21|
| 1| 9| 3|2023-01-22|2020-10-09|
| 2| 5| 1|2020-01-01|2020-10-09|
+---+-----+------+----------+----------+用于以下方面的投入:
+---+-----+------+--------+
|key|value|value2| date|
+---+-----+------+--------+
| 1| 1| 1|20200101|
| 1| 8| 1|20200102|
| 1| 1| 1|20200103|
| 1| 10| 1|20200113|
| 1| 9| 1|20200120|
| 1| 6| 1|20200121|
| 1| 9| 1|20200122|
| 1| 9| 1|20210120|
| 1| 9| 1|20220121|
| 1| 9| 3|20230122|
| 2| 5| 1|20200101|
+---+-----+------+--------+这个函数的缺点是无限数量的状态被建立起来-对于每个键.但是,当我计划将它应用于相当小的维度表时,我认为无论如何都应该是好的。
https://stackoverflow.com/questions/64284308
复制相似问题