首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink流压缩无法使用Amazon S3连接器(StreamingFileSink和CompressWriterFactory)

Flink流压缩无法使用Amazon S3连接器(StreamingFileSink和CompressWriterFactory)
EN

Stack Overflow用户
提问于 2020-06-01 18:32:49
回答 2查看 756关注 0票数 0

当我将Apache作为Sink运行到AWS S3时,标准版本(forRowFormat)可以正常工作。

代码语言:javascript
复制
StreamingFileSink<String> s3sink = StreamingFileSink
        .forRowFormat(new Path(s3Url),
                (String element, OutputStream stream) -> {
                    PrintStream out = new PrintStream(stream);
                    out.println(element);
                })
            .withBucketAssigner(new BucketAssigner())
            .withRollingPolicy(DefaultRollingPolicy.builder()
                    .withMaxPartSize(100)
                    .withRolloverInterval(30000)
                    .build())
            .withBucketCheckInterval(100)
            .build();

当我使用大容量格式和CompressWriterFactory运行相同的内容时

代码语言:javascript
复制
StreamingFileSink<String> s3sink = StreamingFileSink
        .forBulkFormat(new Path(s3Url), 
                new CompressWriterFactory(new DefaultExtractor()))
        .withOutputFileConfig(outputFileConfig)
        .build();

它给了我以下的错误。

(注: CompressWriterFactory可以很好地处理HDFS方案'hdfs://host:port/path')

代码语言:javascript
复制
java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
    at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:112)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:126)
    at org.apache.flink.formats.compress.writers.NoCompressionBulkWriter.finish(NoCompressionBulkWriter.java:56)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:62)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:250)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:241)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:422)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    ...
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

备注-

"s3a://bucket/folder/path";

  1. Flink版本1.10.0
  2. s3Url =
EN

回答 2

Stack Overflow用户

发布于 2020-06-05 18:15:45

这好像是个虫子。您可以使用这里包含的描述打开JIRA。

票数 0
EN

Stack Overflow用户

发布于 2020-10-07 17:26:48

刚刚找到了解决此问题的方法:指定Hadoop压缩编解码器:

代码语言:javascript
复制
CompressWriters.forExtractor(new DefaultExtractor()).withHadoopCompression("GzipCodec")
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62138635

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档