首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从亚马逊网络服务s3存储桶中读取csv时出错

从亚马逊网络服务s3存储桶中读取csv时出错
EN

Stack Overflow用户
提问于 2021-06-04 00:19:16
回答 1查看 143关注 0票数 0

我正在使用一个docker-compose文件,在其中运行一个spark-cluster。SPARK_VERSION="3.0.0“HADOOP_VERSION="3.2”

所有文件都可以在以下Github链接中找到:https://github.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker

我正在尝试使用以下代码从aws s3读取csv文件:

代码语言:javascript
复制
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.874,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import datetime
from pyspark.sql.types import DateType
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField,StringType,IntegerType,DoubleType, LongType,StructType


sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("pyspark-2")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

AWS_ACCESS_KEY_ID = "AKIAXJFEEOGKW4RSZG3B"
AWS_SECRET_ACCESS_KEY = "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"

sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf=sc._jsc.hadoopConfiguration()
#hadoop_conf.set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
hadoop_conf.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

df = spark.read.csv("s3a://<PATH>)

但我收到以下错误:

代码语言:javascript
复制
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-24141d88d3e8> in <module>
----> 1 df = spark.read.csv("s3a://migrationawsbucket/allevents.csv")

/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)
    533             path = [path]
    534         if type(path) == list:
--> 535             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    536         elif isinstance(path, RDD):
    537             def func(iterator):

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in _call_(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o37.csv.
: java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.StreamCapabilities
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 41 more

我已经尝试了在此链接中找到的不同解决方案:How can I read from S3 in pyspark running in local mode?

-更新

我已经尝试按照stevel的建议更改aws-java-sdk-bundlehadoop-aws,不同的版本可以在:https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws中找到

我尝试使用:hadoop-aws-3.2.0.jaraws-java-sdk-bundle-1.11.874.jar,答案如下:AWS EKS Spark 3.0, Hadoop 3.2 Error - NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException

但同样的错误

EN

回答 1

Stack Overflow用户

发布于 2021-06-04 01:18:04

hadoop-aws JAR和aws-sdk之间根本不匹配。恐怕他们不会混在一起。

使用mvnrepo计算出spark拥有的hadoop-*工件所需的确切版本。不要考虑混合hadoop-*工件版本,因为它们与spark JARs一样紧密集成

我建议您使用hadoop-3.2.x或更高版本的工件升级到spark版本,这些版本与最新的1.11 AWS SDK兼容。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67825167

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档