首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >配置FileSink以处理S3上的.gz文件

配置FileSink以处理S3上的.gz文件
EN

Stack Overflow用户
提问于 2022-07-19 15:30:43
回答 1查看 90关注 0票数 1

我确实从flink 1.11升级到1.15.1在S3上编写S3文件。由于connectors.fs现在已经过时,我必须更改我以前使用过的接收器: EventuallyConsistentBucketingSink:

代码语言:javascript
复制
https://github.com/sjwiesman/flink/blob/83a6400e2587b067d08a64bc7e10edd4b57e71b4/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/consistent/EventuallyConsistentBucketingSink.java

对于一些新事物,我使用FileSink:

代码语言:javascript
复制
return FileSink.forRowFormat(
                new Path(outputBasePath), new Encoder<T>() {
                    @Override
                    public void encode(T record, OutputStream stream)
                        throws IOException {
                        GzipParameters params = new GzipParameters();
                        params.setCompressionLevel(Deflater.BEST_COMPRESSION);
                        GzipCompressorOutputStream out = new GzipCompressorOutputStream(stream, params);
                        OBJECT_MAPPER.writeValue(out, record);
                        out.finish();
                    }

                })
            .withBucketAssigner(new BasePathBucketAssigner<>())
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .build();

其中outputBasePath是一些s3:// URI。我发现了一个错误:

代码语言:javascript
复制
Exception in thread "main" java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:60)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:215)
    at org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475)
    at org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466)
    at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
    at org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo.toString(CommittableMessageTypeInfo.java:120)

我在这里错过了什么?

EN

回答 1

Stack Overflow用户

发布于 2022-07-19 21:10:42

如果使用hadoop,建议使用"s3a://“作为模式。

我的线人是这里

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73039740

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档