我正在使用Databricks Delta表,但是在上游的一些表上有一些问题。我知道这是一个相当长的文本下面,但我试图描述我的问题尽可能清楚。如果有些地方不清楚,请告诉我。
我有以下表格和流程:
Landing_zone ->这是一个添加JSON文件的文件夹,其中包含插入或更新记录的数据。Raw_table -> --这是JSON文件中的数据,但是是表格格式的。这张表是三角形格式的。不进行转换,除非将JSON结构转换为表格结构(我做了一个keys,然后从JSON键创建列)。Intermediate_table -> --这是raw_table,但是有一些额外的列(取决于其他列值)。
要从我的着陆区到原始表,我有下面的Pyspark代码:
cloudfile = {"cloudFiles.format":"JSON",
"cloudFiles.schemaLocation": sourceschemalocation,
"cloudFiles.inferColumnTypes": True}
@dlt.view('landing_view')
def inc_view():
df = (spark
.readStream
.format('cloudFiles')
.options(**cloudFilesOptions)
.load(filpath_to_landing)
<Some transformations to go from JSON to tabular (explode, ...)>
return df
dlt.create_target_table('raw_table',
table_properties = {'delta.enableChangeDataFeed': 'true'})
dlt.apply_changes(target='raw_table',
source='landing_view',
keys=['id'],
sequence_by='updated_at')此代码按预期工作。我运行它,将一个changes.JSON文件添加到着陆区,重新运行管道,并正确地将上游应用到“raw_table”
(但是,每次在delta文件夹中创建一个包含所有数据的新的拼花文件时,我都会期望只添加一个带有插入和更新行的拼花文件?关于当前版本的一些信息保存在增量日志中吗?不确定这是否与我的问题有关。我已经将“raw_table”的raw_table改为enableChangeDataFeed = true。readStream for 'intermediate_table‘则有选项(readChangeFeed,’true‘)。
然后,我有以下代码从'raw_table‘转到’intermediate‘:
@dlt.table(name='V_raw_table', table_properties={delta.enableChangeDataFeed': 'True'})
def raw_table():
df = (spark.readStream
.format('delta')
.option('readChangeFeed', 'true')
.table('LIVE.raw_table'))
df = df.withColumn('ExtraCol', <Transformation>)
return df
ezeg
dlt.create_target_table('intermediate_table')
dlt.apply_changes(target='intermediate_table',
source='V_raw_table',
keys=['id'],
sequence_by='updated_at')不幸的是,当我运行这个程序时,我得到了一个错误:“检测到了一个数据更新(例如,在第2版的源表中检测到了part-00000-7127bd29-6820-406c-a5a1-e76fc7126150-c000.snappy.parquet) )。这是目前不支持的。如果您想忽略更新,请将”ignoreChanges“选项设置为”true“。如果希望反映数据更新,请使用新的检查点目录重新启动此查询。
我登记了“不光彩的变化”,但我不认为这是我想要的。我希望自动加载程序能够检测到delta表中的更改,并通过流传递它们。
我知道readStream只适用于附加文件,但这就是为什么我希望在更新'raw_table‘之后,只使用插入和更新将一个新的拼板文件添加到增量文件夹中。然后,自动加载器会检测到这个添加的拼图文件,并可用于将更改应用到“intermediate”。
我是不是做错了?还是我忽略了什么?提前感谢!
发布于 2022-09-23 14:05:46
由于readStream只使用附加程序,所以源文件中的任何更改都会在下游产生问题。关于"raw_table“的更新只会插入一个新的拼花文件的假设是不正确的。基于诸如“优化写入”之类的设置,甚至没有它,apply_changes可以添加或删除文件。您可以在"numTargetFilesAdded“和"numTargetFilesRemoved”下的"raw_table/_delta_log/xxx.json“中找到此信息。
基本上,"Databricks建议您使用自动加载程序只摄入不可变的文件“。
发布于 2022-10-20 23:01:38
当您更改设置以包括选项'.option('readChangeFeed',‘true’)时,您应该以完全刷新开始(在start附近有下拉列表)。这样做将解决“检测到的数据更新xxx”错误,您的代码应该可以用于增量更新。
https://stackoverflow.com/questions/72629136
复制相似问题