我尝试在本地部署的Flink Docker集群上部署一个Apache光束管道。
管道失败,出现以下错误
The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)
org.apache.flink.api.java.RemoteEnvironmentConfigUtils.validate(RemoteEnvironmentConfigUtils.java:52)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.validateAndGetEffectiveConfiguration(RemoteStreamEnvironment.java:178)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:158)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:144)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:113)
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.<init>(FlinkExecutionEnvironments.java:319)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:177)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:139)
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:98)
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
ApacheBeamPocJava.main(ApacheBeamPocJava.java:262)这就是我设置管道的方式
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setRunner(FlinkRunner.class);
options.setFlinkMaster(“localhost:6123”);
options.setFilesToStage(Arrays.asList("path to the beam jar"));
FlinkRunner flinkRunner = FlinkRunner.fromOptions(options);
Pipeline p= Pipeline.create(options);并且在定义流水线的步骤之后。我这样运行它
flinkRunner.run(p);这就是我提交作业的方式
flink run -c ClassName PATH_TO_JAR有人能告诉我这里出了什么问题吗?
另外,如果有人有一个<->光束<-> Flink示例,Java就很方便了。我当然也会很感激。
发布于 2020-11-05 17:03:00
看起来你已经在流水线内部定义了运行环境。您是否尝试过像Flink runner documentation中描述的那样启动您的管道?(删除定义runner或配置runner的代码部分。)
因为光束是一个框架,它将你的代码与执行它的运行器解耦,所以没有必要在你的流水线代码本身中配置Flink运行器。如果您可以使用直接运行器在本地执行您的流水线,那么当使用正确的配置文件进行编译时,它也可以在Flink运行器(或任何其他受支持的运行器)上工作。
bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters-for-your-pipeline-or-the-runner
请注意,目前在Beam 2.25.0中有一个用于Flink runner的bug,因此请尝试使用2.24.0版本,或在其发布时使用更高版本。
https://stackoverflow.com/questions/63497392
复制相似问题