我有一个三角表,我正在阅读作为StreamingQuery。
纵观Delta历史,使用DESCRIBE History,我看到99%的OperationMetrics声明numTargetRowsUpdates is 0 (大多数操作都是插入的)。然而,有时会出现2-3个numTargetRowsUpdates > 1的情况,而Delta上的操作则是一个合并。
我仍然可以使用StreamingQuery并将这些数据作为流读取,还是会得到错误?即:
df: DataFrame = spark \
.readStream \
.format("delta") \
.load(f"{table_location}") \
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f "{checkpoint}/{table_location}")\
.trigger(once=True) \
.foreachBatch(process_batch) \
.start()现在,我有了另一个Delta,它更像是客户信息的维度表,即电子邮件、名称、最后一次看到等等。我最初将其作为附加的StreamingQuery阅读,但我得到了以下错误:java.lang.UnsupportedOperationException: Detected a data update
纵观这个表,在“描述历史”中,我发现有许多更新正在发生。问:如果我将StreamQuery与IgnoreChanges, True结合使用,这是否会将更新的记录作为新记录发送,我可以在foreachBatch中进一步处理这些记录?
发布于 2022-04-14 09:01:43
如果您的增量源中有更新或删除,则读取流将引发异常。这一点在数据库文档:中也很清楚。
结构化流不处理非追加的输入,如果对用作源的表进行任何修改,则抛出异常。
如果使用IgnoreChanges, True,它将不会抛出异常,但它将为您提供可能已经处理的已更新的行+行。这是因为delta表中的所有内容都发生在文件级别上。例如,如果您更新文件中的单个行(大致如此),将发生以下情况:
在文档中也提到了这一点。
ignoreChanges:如果由于更新、合并、删除(分区内)或覆盖等数据更改操作而不得不在源表中重写文件,则重新处理更新。可能仍会发出未更改的行,因此您的下游使用者应该能够处理重复的行。..。
您必须决定用例是否合适。如果您需要专门处理更新和删除,databricks提供了变更数据馈送,您可以在delta表上启用它。这为您提供了有关插入、附加和删除的行级详细信息(代价是一些额外的存储和IO)。
https://stackoverflow.com/questions/71866652
复制相似问题