我们已经用1.4版本部署了一个新的Flink实例。在尝试从旧的1.2.1部署恢复保存点时,尝试还原的所有作业都会出现相同的错误:
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1360)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserializeSubtaskState(SavepointV1Serializer.java:171)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:96)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:54)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:278)
at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:70)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1141)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1350)
... 10 more错误信息:
遗产状态(来自Flink <= 1.1,通过“校验”接口创建)不再支持从Flink 1.4开始。请重写你的工作,用“CheckpointedFunction”代替!
但是,似乎是错误的,因为我们的另一个部署正在运行1.2.1。
文档页仍未更新为1.4:https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html,但在过去,并行性似乎一直是一个问题。我试过使用与保存点( savepoint )即将到来的工作相同的工作,并且仍然使用相同的问题。
有什么可以导致这件事的提示吗?如何解决呢?
谢谢!
发布于 2018-01-05 15:12:58
所以,终于解决了这个问题。
我们开始在Flink 1.1中运行任务,然后将任务的保存点迁移到1.2.1。
看起来Flink 1.2.1并没有对保存点进行任何升级,所以它们仍然有旧的格式,Flink 1.4不支持这种格式。
解决方案是在Flink 1.3中运行我们的任务+保存点,并在那里创建一个新的保存点,它将以新的格式保存。此版本最终与Flink 1.4兼容:)
发布于 2018-01-05 13:08:25
使用1.4.0版本,Flink不再支持从Checkpointed接口获取的状态恢复。要进行有状态升级,必须执行以下操作:
Checkpointed替换为CheckpointedFunctionCheckpointedRestoring接口以从Checkpointed保存点恢复CheckpointedRestoring接口在迁移你的工作时,让我知道是否还有其他问题。
https://stackoverflow.com/questions/48098737
复制相似问题