首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >数据库中的StreamingQuery Delta表-描述历史

数据库中的StreamingQuery Delta表-描述历史
EN

Stack Overflow用户
提问于 2022-04-14 04:50:34
回答 1查看 1K关注 0票数 3

我有一个三角表,我正在阅读作为StreamingQuery。

纵观Delta历史,使用DESCRIBE History,我看到99%的OperationMetrics声明numTargetRowsUpdates is 0 (大多数操作都是插入的)。然而,有时会出现2-3个numTargetRowsUpdates > 1的情况,而Delta上的操作则是一个合并。

我仍然可以使用StreamingQuery并将这些数据作为流读取,还是会得到错误?即:

代码语言:javascript
复制
df: DataFrame = spark \
                .readStream \
                .format("delta") \
                .load(f"{table_location}") \

         df.writeStream \
                    .format("delta") \
                    .outputMode("append") \
                    .option("checkpointLocation", f "{checkpoint}/{table_location}")\
                    .trigger(once=True) \
                    .foreachBatch(process_batch) \
                    .start()

现在,我有了另一个Delta,它更像是客户信息的维度表,即电子邮件、名称、最后一次看到等等。我最初将其作为附加的StreamingQuery阅读,但我得到了以下错误:java.lang.UnsupportedOperationException: Detected a data update

纵观这个表,在“描述历史”中,我发现有许多更新正在发生。问:如果我将StreamQuery与IgnoreChanges, True结合使用,这是否会将更新的记录作为新记录发送,我可以在foreachBatch中进一步处理这些记录?

EN

回答 1

Stack Overflow用户

发布于 2022-04-14 09:01:43

如果您的增量源中有更新或删除,则读取流将引发异常。这一点在数据库文档:中也很清楚。

结构化流不处理非追加的输入,如果对用作源的表进行任何修改,则抛出异常。

如果使用IgnoreChanges, True,它将不会抛出异常,但它将为您提供可能已经处理的已更新的行+行。这是因为delta表中的所有内容都发生在文件级别上。例如,如果您更新文件中的单个行(大致如此),将发生以下情况:

  1. 查找并读取包含要更新的记录的文件
  2. 编写一个新文件,其中包含更新的记录+旧文件中的所有其他数据
  3. 将旧文件标记为已删除,并将新文件标记为在事务日志中添加。
  4. 您的读取流将读取整个新文件为“新”记录。这意味着你可以在你的蒸汽中得到副本。

在文档中也提到了这一点。

ignoreChanges:如果由于更新、合并、删除(分区内)或覆盖等数据更改操作而不得不在源表中重写文件,则重新处理更新。可能仍会发出未更改的行,因此您的下游使用者应该能够处理重复的行。..。

您必须决定用例是否合适。如果您需要专门处理更新和删除,databricks提供了变更数据馈送,您可以在delta表上启用它。这为您提供了有关插入、附加和删除的行级详细信息(代价是一些额外的存储和IO)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71866652

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档