我已经开发了一个用于加载Pyspark Glue数据集的complete/incremental作业。它很好用。加载数据集之后,我必须执行少量的"overwrite"/"append",并以"overwrite"/"append"模式在单个位置写入它。为此,我编写了以下代码:
maxDateValuePath = "s3://...../maxValue/"
outputPath = "s3://..../complete-load/"
aggregatedPath = "s3://...../aggregated-output/"
fullLoad = ""
aggregatedView = ""
completeAggregatedPath = "s3://...../aggregated-output/step=complete-load/"
incrAggregatedPath = "s3://....../aggregated-output/step=incremental-load/"
aggregatedView=""
data.createOrReplaceTempView("data")
aggregatedView = spark.sql("""
select catid,count(*) as number_of_catids from data
group by catid""")
if (incrementalLoad == str(0)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
aggregatedView.write.mode("overwrite").parquet(completeAggregatedPath)
elif (incrementalLoad == str(1)):
aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
log.info("step 123: " + str(aggregatedView.count()))
aggregatedView.write.mode("append").parquet(completeAggregatedPath)
aggregatedView = spark.read.parquet(completeAggregatedPath)
log.info("step 126: " + str(aggregatedView.count()))
w = Window.partitionBy("catid").orderBy(col("created_at").desc())
aggregatedView = aggregatedView.withColumn("rw", row_number().over(w)).filter(col("rw") == lit(1)).drop(
"rw")
log.info("step 130: " + str(aggregatedView.count()))
log.info(aggregatedView.orderBy(col("created_at").desc()).show())
print("::::::::::::before writing::::::::::::::")
aggregatedView.write.mode("overwrite").parquet(incrAggregatedPath)其中0和1代表满载/增量负载。现在,在编写转换数据集之前,我要添加一个created_at列,用于在编写增量数据集之后处理最新的聚合记录,否则会导致重复。
一切正常运行,但问题是,当我试图使用增量部件的此行aggregatedView.write.mode("overwrite").parquet(aggregatedPath)以覆盖模式编写数据集时,存储桶在s3中被删除,此操作将导致下面的aggregatedView.write.mode("overwrite").parquet(aggregatedPath)
Caused by: java.io.FileNotFoundException: File not present on S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.为什么桶被删除了?
发布于 2021-10-19 08:31:33
因此,我通过更改代码的下面一行来解决我的问题:
aggregatedView2 = spark.read.parquet(completeAggregatedPath)因此,对于聚合视图,将有一个df谱系。由于在相同的s3位置和相同的df沿袭上执行读和写,所以它删除前缀,因为df的源数据不明确。因此,创建了一个新的df,其中它将查找S3位置,而不是前面的转换。
也许能帮上忙!
发布于 2021-10-14 02:28:59
问题在于你的代码。您正在对增量部分中的相同位置进行读写。
aggregatedView = spark.read.parquet(aggregatedPath)
...
...
aggregatedView.write.mode("overwrite").parquet(aggregatedPath) 由于spark做的是一个懒惰的评估,所以当您将模式指定为覆盖时,它会清除特定文件夹中的数据,从而使您没有什么可读的。当它到达代码的写部分时,它开始读取数据,此时您的数据已被写入操作清除。
https://stackoverflow.com/questions/69561478
复制相似问题