首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Databricks Delta Live表-应用对delta表的更改

Databricks Delta Live表-应用对delta表的更改
EN

Stack Overflow用户
提问于 2022-06-15 09:42:45
回答 2查看 1.7K关注 0票数 5

我正在使用Databricks Delta表,但是在上游的一些表上有一些问题。我知道这是一个相当长的文本下面,但我试图描述我的问题尽可能清楚。如果有些地方不清楚,请告诉我。

我有以下表格和流程:

Landing_zone ->这是一个添加JSON文件的文件夹,其中包含插入或更新记录的数据。Raw_table -> --这是JSON文件中的数据,但是是表格格式的。这张表是三角形格式的。不进行转换,除非将JSON结构转换为表格结构(我做了一个keys,然后从JSON键创建列)。Intermediate_table -> --这是raw_table,但是有一些额外的列(取决于其他列值)。

要从我的着陆区到原始表,我有下面的Pyspark代码:

代码语言:javascript
复制
cloudfile = {"cloudFiles.format":"JSON", 
                       "cloudFiles.schemaLocation": sourceschemalocation, 
                       "cloudFiles.inferColumnTypes": True}

@dlt.view('landing_view')
def inc_view():
    df = (spark
             .readStream
             .format('cloudFiles')
             .options(**cloudFilesOptions)
             .load(filpath_to_landing)
     <Some transformations to go from JSON to tabular (explode, ...)>
     return df

dlt.create_target_table('raw_table', 
                        table_properties = {'delta.enableChangeDataFeed': 'true'})
  
dlt.apply_changes(target='raw_table',
                  source='landing_view',
                  keys=['id'],
                  sequence_by='updated_at')

此代码按预期工作。我运行它,将一个changes.JSON文件添加到着陆区,重新运行管道,并正确地将上游应用到“raw_table”

(但是,每次在delta文件夹中创建一个包含所有数据的新的拼花文件时,我都会期望只添加一个带有插入和更新行的拼花文件?关于当前版本的一些信息保存在增量日志中吗?不确定这是否与我的问题有关。我已经将“raw_table”的raw_table改为enableChangeDataFeed = true。readStream for 'intermediate_table‘则有选项(readChangeFeed,’true‘)。

然后,我有以下代码从'raw_table‘转到’intermediate‘:

代码语言:javascript
复制
@dlt.table(name='V_raw_table', table_properties={delta.enableChangeDataFeed': 'True'})
def raw_table():
     df = (spark.readStream
                .format('delta')
                .option('readChangeFeed', 'true')
                .table('LIVE.raw_table'))
     df = df.withColumn('ExtraCol', <Transformation>)
     return df
 ezeg
dlt.create_target_table('intermediate_table')
dlt.apply_changes(target='intermediate_table',
                  source='V_raw_table',
                  keys=['id'],
                  sequence_by='updated_at')

不幸的是,当我运行这个程序时,我得到了一个错误:“检测到了一个数据更新(例如,在第2版的源表中检测到了part-00000-7127bd29-6820-406c-a5a1-e76fc7126150-c000.snappy.parquet) )。这是目前不支持的。如果您想忽略更新,请将”ignoreChanges“选项设置为”true“。如果希望反映数据更新,请使用新的检查点目录重新启动此查询。

我登记了“不光彩的变化”,但我不认为这是我想要的。我希望自动加载程序能够检测到delta表中的更改,并通过流传递它们。

我知道readStream只适用于附加文件,但这就是为什么我希望在更新'raw_table‘之后,只使用插入和更新将一个新的拼板文件添加到增量文件夹中。然后,自动加载器会检测到这个添加的拼图文件,并可用于将更改应用到“intermediate”。

我是不是做错了?还是我忽略了什么?提前感谢!

EN

回答 2

Stack Overflow用户

发布于 2022-09-23 14:05:46

由于readStream只使用附加程序,所以源文件中的任何更改都会在下游产生问题。关于"raw_table“的更新只会插入一个新的拼花文件的假设是不正确的。基于诸如“优化写入”之类的设置,甚至没有它,apply_changes可以添加或删除文件。您可以在"numTargetFilesAdded“和"numTargetFilesRemoved”下的"raw_table/_delta_log/xxx.json“中找到此信息。

基本上,"Databricks建议您使用自动加载程序只摄入不可变的文件“。

票数 0
EN

Stack Overflow用户

发布于 2022-10-20 23:01:38

当您更改设置以包括选项'.option('readChangeFeed',‘true’)时,您应该以完全刷新开始(在start附近有下拉列表)。这样做将解决“检测到的数据更新xxx”错误,您的代码应该可以用于增量更新。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72629136

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档