我试图获取表(Delta表)最后一次优化的时间,使用下面的代码和获得预期的输出。此代码将用于数据库中存在的所有表。
table_name_or_path = "abcd"
df = spark.sql("desc history {}".format(table_name_or_path)).select("operation","timestamp").filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc())
if len(df.take(1)) != 0:
last_optimize = df.select(col("timestamp").cast("string").alias("timestamp")).first().asDict()
print(last_optimize["timestamp"])
last_optimize = last_optimize["timestamp"]
else:
last_optimize = ""上面的代码将需要一些时间,它将触发许多火花作业。
我想优化上面的代码以获得更好的性能。
是否有任何方法来编写优化的代码,这将更有帮助。
发布于 2022-08-18 07:43:20
最好避免像if len(df.take(1)) != 0这样的检查,因为它可能导致在以后执行.first()时重新计算结果。相反,只需使用.limit(1)限制行数,并检查收集项的结果。类似的东西(未经测试):
table_name_or_path = "abcd"
df = spark.sql(f"desc history {table_name_or_path}") \
.select("operation","timestamp") \
.filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc()) \
.limit(1)
data = df.collect()
if len(data) > 0:
last_optimize = data[0].asDict()
print(last_optimize["timestamp"])
last_optimize = last_optimize["timestamp"]
else:
last_optimize = ""发布于 2022-08-18 06:01:45
通常,在开始对数据帧进行任何计算之前,当您缓存数据帧时,它通常会有所帮助。
df = spark.sql("desc history {}".format(table_name_or_path)).select("operation","timestamp").filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc()).cache()我假设这里缓存的orderBy步骤已经减少了计算量
https://stackoverflow.com/questions/73398052
复制相似问题