首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将增量文件写入S3 (MinIO) - PySpark 2.4.3

将增量文件写入S3 (MinIO) - PySpark 2.4.3
EN

Stack Overflow用户
提问于 2019-09-08 19:36:59
回答 1查看 2.2K关注 0票数 0

目前,我正在尝试将delta-lake parquet文件写入S3,并在本地用MinIO替换该文件。

我可以很好地将标准的parquet文件读写到S3中。

但是,当我使用三角洲湖实例

配置达美调至s3

我似乎不能把delta_log/写到我的MinIO上。

所以我试着设置:fs.AbstractFileSystem.s3a.implfs.s3a.impl

我正在使用pyspark[sql]==2.4.3,这是我在当前venv中使用的。

src/.env

代码语言:javascript
复制
# pyspark packages
DELTA = io.delta:delta-core_2.11:0.3.0
HADOOP_COMMON = org.apache.hadoop:hadoop-common:2.7.3
HADOOP_AWS = org.apache.hadoop:hadoop-aws:2.7.3
PYSPARK_SUBMIT_ARGS = ${HADOOP_AWS},${HADOOP_COMMON},${DELTA}

src/spark_session.py

代码语言:javascript
复制
# configure s3 connection for read/write operation (native spark)
hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
# hadoop_conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")  #  when using hadoop 2.8.5
# hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")  #  alternative to above hadoop 2.8.5
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("spark.history.fs.logDirectory", 's3a://spark-logs-test/')

src/apps/raw_to_parquet.py

代码语言:javascript
复制
# Trying to write pyspark dataframe to MinIO (S3)

raw_df.coalesce(1).write.format("delta").save(s3_url)

bash

代码语言:javascript
复制
# RUN CODE
spark-submit --packages $(PYSPARK_SUBMIT_ARGS) src/run_onlineretailer.py

hadoop-common: 2.7.3hadoop-aws: 2.7.3java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.fs.s3a.S3AFileSystem.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)出错

因此,有了这个错误,我更新为hadoop-common: 2.8.5hadoop-aws: 2.8.5,以修复NoSuchMethodException。因为delta需要:S3AFileSystem

py4j.protocol.Py4JJavaError: An error occurred while calling o89.save. : java.lang.NoSuchMethodError: org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders(Lorg/apache/hadoop/conf/Configuration;Ljava/lang/Class;)Lorg/apache/hadoop/conf/Configuration

因此,在我看来,parquet文件似乎可以在没有问题的情况下编写,但是,delta创建了这些无法识别的delta_log文件夹(我认为?)。

电流源代码

阅读几个不同的类似问题,但似乎没有人尝试使用delta lake文件。

更新

它目前与以下设置一起工作:

代码语言:javascript
复制
#pyspark packages
DELTA_LOGSTORE = spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
DELTA = io.delta:delta-core_2.11:0.3.0
HADOOP_COMMON = org.apache.hadoop:hadoop-common:2.7.7
HADOOP_AWS = org.apache.hadoop:hadoop-aws:2.7.7
PYSPARK_SUBMIT_ARGS = ${HADOOP_AWS},${HADOOP_COMMON},${DELTA}
PYSPARK_CONF_ARGS = ${DELTA_LOGSTORE}
代码语言:javascript
复制
# configure s3 connection for read/write operation (native spark)
hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
代码语言:javascript
复制
spark-submit --packages $(PYSPARK_SUBMIT_ARGS) --conf $(PYSPARK_CONF_ARGS) src/run_onlineretailer.py

奇怪的是它只会像这样工作。

如果我试图用sc.confhadoop_conf设置它,请参阅未注释的代码:

代码语言:javascript
复制
def spark_init(self) -> SparkSession:

    sc: SparkSession = SparkSession \
        .builder \
        .appName(self.app_name) \
        .config("spark.sql.warehouse.dir", self.warehouse_location) \
        .getOrCreate()

    # set log level
    sc.sparkContext.setLogLevel("WARN")

    # Enable Arrow-based columnar data transfers
    sc.conf.set("spark.sql.execution.arrow.enabled", "true")

    # sc.conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") # does not work

    # configure s3 connection for read/write operation (native spark)
    hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
    hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
    hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
    #hadoop_conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") # does not work

    return sc

如果有人能解释的话,那就太好了。是因为.getOrCreate()吗?似乎不可能在没有此调用的情况下设置conf?除非在运行应用程序时在命令行中。

EN

回答 1

Stack Overflow用户

发布于 2019-09-09 10:39:46

您正在混合hadoop-* jar;就像星星之火一样,只有当它们都来自同一个版本时,它们才能工作。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57845157

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档