首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >编写拼花文件时,s3桶将被删除。

编写拼花文件时,s3桶将被删除。
EN

Stack Overflow用户
提问于 2021-10-13 19:44:13
回答 2查看 673关注 0票数 0

我已经开发了一个用于加载Pyspark Glue数据集的complete/incremental作业。它很好用。加载数据集之后,我必须执行少量的"overwrite"/"append",并以"overwrite"/"append"模式在单个位置写入它。为此,我编写了以下代码:

代码语言:javascript
复制
        maxDateValuePath = "s3://...../maxValue/"
        outputPath = "s3://..../complete-load/"
        aggregatedPath = "s3://...../aggregated-output/"
        fullLoad = ""
        aggregatedView = ""
        completeAggregatedPath = "s3://...../aggregated-output/step=complete-load/"
        incrAggregatedPath = "s3://....../aggregated-output/step=incremental-load/"
       
        aggregatedView=""
        data.createOrReplaceTempView("data")
        aggregatedView = spark.sql("""
        select catid,count(*) as number_of_catids from data 
        group by catid""")
    if (incrementalLoad == str(0)):
        aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
        aggregatedView.write.mode("overwrite").parquet(completeAggregatedPath)
    elif (incrementalLoad == str(1)):
        aggregatedView = aggregatedView.withColumn("created_at", current_timestamp())
        log.info("step 123: " + str(aggregatedView.count()))
        aggregatedView.write.mode("append").parquet(completeAggregatedPath)
        aggregatedView = spark.read.parquet(completeAggregatedPath)
        log.info("step 126: " + str(aggregatedView.count()))
        w = Window.partitionBy("catid").orderBy(col("created_at").desc())
        aggregatedView = aggregatedView.withColumn("rw", row_number().over(w)).filter(col("rw") == lit(1)).drop(
            "rw")
        log.info("step 130: " + str(aggregatedView.count()))
        log.info(aggregatedView.orderBy(col("created_at").desc()).show())
        print("::::::::::::before writing::::::::::::::")
        aggregatedView.write.mode("overwrite").parquet(incrAggregatedPath)

其中01代表满载/增量负载。现在,在编写转换数据集之前,我要添加一个created_at列,用于在编写增量数据集之后处理最新的聚合记录,否则会导致重复。

一切正常运行,但问题是,当我试图使用增量部件的此行aggregatedView.write.mode("overwrite").parquet(aggregatedPath)以覆盖模式编写数据集时,存储桶在s3中被删除,此操作将导致下面的aggregatedView.write.mode("overwrite").parquet(aggregatedPath)

代码语言:javascript
复制
    Caused by: java.io.FileNotFoundException: File not present on S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

为什么桶被删除了?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-10-19 08:31:33

因此,我通过更改代码的下面一行来解决我的问题:

代码语言:javascript
复制
aggregatedView2 = spark.read.parquet(completeAggregatedPath)

因此,对于聚合视图,将有一个df谱系。由于在相同的s3位置和相同的df沿袭上执行读和写,所以它删除前缀,因为df的源数据不明确。因此,创建了一个新的df,其中它将查找S3位置,而不是前面的转换。

也许能帮上忙!

票数 0
EN

Stack Overflow用户

发布于 2021-10-14 02:28:59

问题在于你的代码。您正在对增量部分中的相同位置进行读写。

代码语言:javascript
复制
aggregatedView = spark.read.parquet(aggregatedPath)
...
...
aggregatedView.write.mode("overwrite").parquet(aggregatedPath)  

由于spark做的是一个懒惰的评估,所以当您将模式指定为覆盖时,它会清除特定文件夹中的数据,从而使您没有什么可读的。当它到达代码的写部分时,它开始读取数据,此时您的数据已被写入操作清除。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69561478

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档