我正试着用Kedro写增量表。将文件格式更改为增量会使写入为增量表,模式为覆盖。
以前,原始层(meta_reload)中的节点创建一个数据集,该数据集确定每个数据集的增量加载的开始日期。每个节点使用该原始数据集来过滤工作数据集,以应用转换逻辑并增量地写入分区拼图表格。
但是现在,将增量写入模式为覆盖,仅对增量进行文件类型更改会使当前增量数据覆盖所有过去的数据,而不仅仅是那些分区。因此,我需要在目录中的save_args中使用replaceWhere选项。当我需要读取replaceWhere原始数据集来确定日期时,如何确定目录中meta_reload的开始日期。有没有一种方法可以从节点内部动态传递save_args?
my_dataset:
type: my_project.io.pyspark.SparkDataSet
filepath: "s3://${bucket_de_pipeline}/${data_environment_project}/${data_environment_intermediate}/my_dataset/"
file_format: delta
layer: intermediate
save_args:
mode: "overwrite"
replaceWhere: "DATE_ID > xyz" ## what I want to implement dynamically
partitionBy: [ "DATE_ID" ]发布于 2021-09-29 15:41:11
我已经在GH discussion上回答了这个问题。简而言之,您需要子类化并定义您自己的SparkDataSet,我们避免在Kedro级别更改datasets的底层API,但鼓励您根据自己的目的对其进行更改和重新混合。
https://stackoverflow.com/questions/69378898
复制相似问题