我目前正在使用Flink 1.7 +gcs-连接器库。我试图让StreamingFileSink写到GCS桶中,并遇到以下异常:
我遇到了一个Jira:https://issues.apache.org/jira/browse/FLINK-11838 --但我不清楚代码是否曾经合并过。
在需要做什么方面有什么帮助是非常感谢的,以使这一工作?
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) ~[flink-hadoop-fs-1.7.0.jar:1.7.0]
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) ~[flink-hadoop-fs-1.7.0.jar:1.7.0]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) ~[flink-core-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) ~[flink-runtime_2.11-1.7.0.jar:1.7.0]
at java.lang.Thread.run(Thread.java:834) ~[?:?]发布于 2019-12-03 08:07:59
据我所知,它并没有合并。
问题是,为了让Flink接收器组件参与检查点的保存和恢复,它需要能够重置接收器组件正在写入的任何内容的状态。对于写入文件的接收器,这意味着接收器可能必须截断文件并开始附加文件,以便将文件返回到上一个检查点时文件所在的状态。GCS和亚马逊的S3一样,是二进制对象存储,而不是真正的文件系统。虽然您可以使用二进制存储来完成大部分与文件系统有关的事情,但是您不能截断二进制对象,也不能将其附加到其中。您可以将其显示为截断并附加到二进制对象,但它是一个非常低效的API层。因此,尝试在GCS (或S3)上使用S3并不是很有效。
您可能最好写到一个实际的文件系统,然后添加最后一步,以传输写入的文件到GCS。这意味着您可能需要编写另一个接收器,以便检查点体系结构涵盖行为,但这是您的最佳选择。我认为。
https://stackoverflow.com/questions/59149297
复制相似问题