首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法读取flink中带有前缀的s3文件

无法读取flink中带有前缀的s3文件
EN

Stack Overflow用户
提问于 2020-04-21 21:49:36
回答 1查看 427关注 0票数 0

我正在flink集群上部署作业。S3路径类似于:

网址: s3a://bucket-name/pre/pre1/original/2019_12_19/file.parquet.gz

我把它理解为:

代码语言:javascript
复制
  val job = Job.getInstance
        FileInputFormat.addInputPath(
          job,
          new org.apache.hadoop.fs.Path(url)
        )

        val hadoopInputFormat =
          new HadoopInputFormat(
            new AvroParquetInputFormat[GenericRecord],
            classOf[Void],
            classOf[GenericRecord],
            job
          )
        val data: List[tuple.Tuple2[Void, GenericRecord]] =
          env.createInput(hadoopInputFormat).collect().asScala.toList

尽管作业正在使用sbt run运行文件,但作业仍未提交。

此外,如果部署了作业并且S3 URL的类型为:

URL: s3a://bucket-name/2019_12_19/file.parquet.gz

提交成功。

代码语言:javascript
复制
 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(parallelism)

依赖关系:

代码语言:javascript
复制
    "org.apache.flink" %% "flink-scala" % "1.10.0" % "provided"
    "org.apache.flink" % "flink-s3-fs-hadoop" % "1.10.0"
    "org.apache.flink" %% "flink-hadoop-compatibility" % "1.10.0"
    "org.apache.hadoop" % "hadoop-mapreduce-client-core" % ""3.1.1""
    "org.apache.hadoop" % "hadoop-aws" % "2.7.2"
    "org.apache.httpcomponents" % "httpcore" % "4.2.5"
    "org.apache.httpcomponents" % "httpclient" % "4.2.5"
    "org.apache.flink" %% "flink-streaming-scala" % "1.10.0" % "provided"

Exception:

    Caused by: java.io.IOException: Class class com.amazonaws.auth.InstanceProfileCredentialsProvider does not implement AWSCredentialsProvider
        at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:623)
        at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:566)
        at org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.createS3Client(DefaultS3ClientFactory.java:52)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:256)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3354)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:302)
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:274)
        at org.apache.parquet.hadoop.ParquetInputFormat.listStatus(ParquetInputFormat.java:349)
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:396)
        at org.apache.parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:304)
        at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:159)
        at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:59)
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)



core-site.xml:




    <configuration>
        <property>
            <name>fs.s3a.impl</name>
            <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
        </property>

        <property>
            <name>fs.s3a.buffer.dir</name>
            <value>/tmp</value>
        </property>

        <property>
            <name>fs.s3a.access.key</name>
            <value>iam</value>
        </property>

        <property>
            <name>fs.s3a.secret.key</name>
            <value>iam</value>
        </property>

        <property>
            <name>fs.s3a.endpoint</name>
            <value>https://s3.us-east-1.amazonaws.com</value>
        </property>

        <property>
            <name>fs.s3a.path.style.access</name>
            <value>true</value>
        </property>

        <property>
            <name>fs.s3a.aws.credentials.provider</name>
            <value>
                com.amazonaws.auth.InstanceProfileCredentialsProvider,
                org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,
                com.amazonaws.auth.EnvironmentVariableCredentialsProvider
            </value>
        </property>
    </configuration>
EN

回答 1

Stack Overflow用户

发布于 2020-04-22 19:45:26

从Flink 1.10开始,只能将flink-s3-fs-hadoop用作plugin

您基本上需要将jar添加到您的flink-dist中,如下所示。

代码语言:javascript
复制
flink-dist
├── conf
├── lib
...
└── plugins
    └── s3
        └── flink-s3-fs-hadoop.jar

然后把它从你的肥肉罐子里拿出来。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61345185

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档