我确实从flink 1.11升级到1.15.1在S3上编写S3文件。由于connectors.fs现在已经过时,我必须更改我以前使用过的接收器: EventuallyConsistentBucketingSink:
https://github.com/sjwiesman/flink/blob/83a6400e2587b067d08a64bc7e10edd4b57e71b4/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/consistent/EventuallyConsistentBucketingSink.java对于一些新事物,我使用FileSink:
return FileSink.forRowFormat(
new Path(outputBasePath), new Encoder<T>() {
@Override
public void encode(T record, OutputStream stream)
throws IOException {
GzipParameters params = new GzipParameters();
params.setCompressionLevel(Deflater.BEST_COMPRESSION);
GzipCompressorOutputStream out = new GzipCompressorOutputStream(stream, params);
OBJECT_MAPPER.writeValue(out, record);
out.finish();
}
})
.withBucketAssigner(new BasePathBucketAssigner<>())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();其中outputBasePath是一些s3:// URI。我发现了一个错误:
Exception in thread "main" java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:60)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:215)
at org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475)
at org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466)
at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
at org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo.toString(CommittableMessageTypeInfo.java:120)我在这里错过了什么?
发布于 2022-07-19 21:10:42
如果使用hadoop,建议使用"s3a://“作为模式。
我的线人是这里
https://stackoverflow.com/questions/73039740
复制相似问题