我使用PySpark将几个文件读入数据帧,并执行它们的联合。因为这两个文件具有不同的权限授予,所以我使用org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider来读取这两个文件。但是,当我尝试读取第二个文件(我可以单独读取这两个文件中的任何一个,但不能一起读取)时,我得到一个错误。
读取文件的代码:
def read_file(file_path, file_id):
aws_tokens = get_aws_tokens_for_file(file_id)
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', aws_access_key)
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', aws_secret_key)
spark._jsc.hadoopConfiguration().set('fs.s3a.session.token', aws_session_token)
spark._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enabled', 'true')
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
df = spark.read.parquet(file_path)
return df现在,下面的代码在两个不同的pyspark-session中工作:
df1 = read_file(file_1_path, file_1_id)df2 = read_file(file_2_path, file_2_id)但是,当我尝试执行以下操作时,它会失败,并显示java.nio.file.AccessDeniedException
df1 = read_file(file_1_path, file_1_id)
df2 = read_file(file_2_path, file_2_id)
df3 = df1.union(df2)
print(df3.count())其中一个原因可能是,只有在执行操作时才实际从s3读取文件,并且当执行该操作时,两个文件所需的aws-credentials是不同的。
因此,我尝试持久化第一个文件,然后读取第二个文件,但失败了,也出现了相同的异常:
df1 = read_file(file_1_path, file_1_id)
_ = df1.persist(StorageLevel.MEMORY_AND_DISK).count()
df2 = read_file(file_2_path, file_2_id) #fails here itself那么,我如何执行两个需要不同aws身份验证凭据的此类文件的联合?
发布于 2020-11-02 20:21:00
Spark执行按需计算,包括读取数据。S3A FileSystem类的实例将由存储桶URI too...changing缓存,只有当存储桶不同时,配置才会生效。
您可以使用每个存储桶设置来更改不同s3存储桶的凭据/凭据提供程序。如果您的数据位于不同的存储桶中,这应该是可行的。有关细节,请参阅hadoop s3文档。
https://stackoverflow.com/questions/64643503
复制相似问题