在我们的数据管道中,我们从数据源中摄取CDC事件,并将这些更改写入AVRO格式的“增量数据”文件夹中。
然后定期运行Spark作业,将这些“增量数据”与当前版本的“快照表”(ORC格式)合并,以获得上游快照的最新版本。
在这个合并逻辑中:
1)将“增量数据”加载为DataFrame df1。
2)将当前的“快照表”加载为DataFrame df2
3)合并df1和df2,取消复制ids,并使用最新版本的行(使用update_timestamp列)
这个逻辑将“增量数据”和当前“快照表”的整个数据加载到Spark内存中,这取决于数据库。
我注意到,在Delta Lake中,使用以下代码完成了类似的操作:
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()在这里,"updatesDF“可以被认为是来自疾控中心的”增量数据“。
我的问题
1)合并/插入内部是如何工作的?它是否将整个"updatedDF“和"/data/events/”加载到火花内存中?
2)如果没有,它是否应用了增量更改,类似于Apache?
3)在重复过程中,这个插入逻辑是如何获取记录的最新版本的?因为我没有看到指定"update时间戳“列的任何设置?
发布于 2019-12-27 23:55:37
1) How does merge/upsert internally works? Does it load entire "updatedDF" and
"/data/events/" into Spark memory?不,Spark不需要加载整个Delta DF,它需要更新到内存中。否则,它将是不可扩展的。它所采用的方法非常类似于Spark所做的其他作业--如果数据集足够大(或者云创建显式分区),整个表将透明地分成多个分区。然后为每个分区分配一个组成merge作业的任务。任务可以运行在不同的火花执行者等。
2) If not, does it apply the delta changes something similar to Apache Hudi ?我听说过Apache Hudi,但还没看过。在内部,Delta看起来像版本化的拼花文件。对表的更改按顺序存储,原子单元称为提交。当您保存一个表时--看看它有哪些文件--它将有类似于000000.json、000001.json等的文件,其中每个文件都将引用子目录中底层拼花文件的一组操作。例如,000000.json会说,这个版本在时间上引用了拼花文件001和002,以及000001.json会说,这个版本在时间上不应该引用这两个旧的拼花文件,而只使用拼花文件003。
3) During deduplication how this upsert logic knows to take the latest version of a record?
Because I don't see any setting to specify the "update timestamp" column?默认情况下,它引用最近的更改集。时间戳是在Delta中实现此版本控制的内部机制。您可以通过AS OF语法引用旧的快照-请参阅https://docs.databricks.com/delta/delta-batch.html#syntax
https://stackoverflow.com/questions/59476892
复制相似问题