我每小时都使用spark hudi进行增量查询,每次都将增量查询的开始和结束时间保存在db(例如mysql)中。对于nexti mysql查询,我使用开始时间作为上一次从mysql获取查询的结束时间。
增量查询应该如下所示:
hudi_incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': hudi_start_commit,
'hoodie.datasource.read.end.instanttime': hudi_end_commit
}但我不知道如何在pyspark(python)中找到hudi_end_commit。在Java中,我可以对helper类HoodieDataSourceHelpers做同样的操作,例如:
String hudi_end_commit = HoodieDataSourceHelpers.latestCommit(FileSystem.get(javaSparkContext.hadoopConfiguration()),l1BasePath);但无法在python中找到同样的解决方案。
经过一次工作之后,我找到了一个不适合大型数据集的解决方案。
spark_session.read.format("hudi").load(l1_base_path).createOrReplaceTempView("hudi_trips_snapshot")
commits = list(map(lambda row: row[0],
spark_session.sql("select distinct(_hoodie_commit_time) as commitTime from "
"hudi_trips_snapshot order by commitTime desc").limit(1).collect()))但是,当数据大小太大时,它会加载整个数据以获得hudi提交,这比读取实际数据本身花费更多的时间。
有什么简单的方法可以找到hudi最近/最后一次提交。
发布于 2022-05-25 18:13:45
试试这个(在pyspark shell中为我工作):
hudi_end_commit = spark._sc._gateway.jvm.org.apache.hudi.HoodieDataSourceHelpers.latestCommit(
spark._sc._gateway.jvm.org.apache.hadoop.fs.FileSystem.get(spark._sc._jsc.hadoopConfiguration()),
"/path/to/hudi/table"
)https://stackoverflow.com/questions/72254098
复制相似问题