如何理解用upsert编写的hudi操作,而df保存了append呢?既然这将重新插入记录,为什么要追加而不是覆盖?有什么关系呢?如图所示:

发布于 2022-07-26 03:46:47
示例:插入一个DataFrame,为recordKey => _row_key、partitionPath =>分区和precombineKey =>时间戳指定必要的字段名
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) //Where clientOpts is of type Map[String, String]. clientOpts can include any other options necessary.
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);生成一些新的旅行,将它们加载到DataFrame中,然后将DataFrame写入Hudi表,如下所示。
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)发布于 2022-08-07 15:03:02
当您使用overwrite模式时,您告诉spark删除该表并重新创建它(如果使用动态partitionOverwriteMode,则只需重新创建新df中的分区)。
但是,当我们使用append模式时,spark会将新数据附加到磁盘/云存储中现有的旧数据中。使用hudi,我们可以提供额外的操作来合并两个版本的数据,更新新数据中有密钥的旧记录,保留新数据中没有密钥的旧记录,并添加具有新键的新记录。这与覆盖数据完全不同。
https://stackoverflow.com/questions/73117484
复制相似问题