如果有任何异常发生,我有用例重试逻辑。我已经添加了检查点和RestartStrategies,但是如果有任何异常,Flink不会重新启动作业。是否需要添加任何其他属性。你能给我举几个例子吗?
streamExecutionEnvironment.enableCheckpointing(2000);
// streamExecutionEnvironment.setParallelism(2);
// advanced options:
// set mode to exactly-once (this is the default)
streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
streamExecutionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(3, TimeUnit.SECONDS) // delay
));发布于 2022-11-04 08:36:57
默认情况下,检查点存储在作业管理器的堆中,这是一个相当脆弱的设置(如果JM失败或重新启动,您将失去检查点)。为了使用FileSystemCheckpointStorage,您应该在flink-conf.yaml中配置检查点目录。
state.checkpoints.dir: file:///checkpoint-dir/或者在你的代码中
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");在第一个检查点完成之前作业也可能失败。
还必须有一些仍然可以重新启动作业的东西,例如作业管理器或kubernetes操作符。如果您在一个小型集群中运行,这是行不通的。
https://stackoverflow.com/questions/74313793
复制相似问题