首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法将保存点从1.2.1还原为1.4

无法将保存点从1.2.1还原为1.4
EN

Stack Overflow用户
提问于 2018-01-04 15:44:18
回答 2查看 210关注 0票数 2

我们已经用1.4版本部署了一个新的Flink实例。在尝试从旧的1.2.1部署恢复保存点时,尝试还原的所有作业都会出现相同的错误:

代码语言:javascript
复制
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1360)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1336)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserializeSubtaskState(SavepointV1Serializer.java:171)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:96)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:54)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:278)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:70)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1141)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1350)
    ... 10 more

错误信息:

遗产状态(来自Flink <= 1.1,通过“校验”接口创建)不再支持从Flink 1.4开始。请重写你的工作,用“CheckpointedFunction”代替!

但是,似乎是错误的,因为我们的另一个部署正在运行1.2.1。

文档页仍未更新为1.4:https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html,但在过去,并行性似乎一直是一个问题。我试过使用与保存点( savepoint )即将到来的工作相同的工作,并且仍然使用相同的问题。

有什么可以导致这件事的提示吗?如何解决呢?

谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-01-05 15:12:58

所以,终于解决了这个问题。

我们开始在Flink 1.1中运行任务,然后将任务的保存点迁移到1.2.1。

看起来Flink 1.2.1并没有对保存点进行任何升级,所以它们仍然有旧的格式,Flink 1.4不支持这种格式。

解决方案是在Flink 1.3中运行我们的任务+保存点,并在那里创建一个新的保存点,它将以新的格式保存。此版本最终与Flink 1.4兼容:)

票数 0
EN

Stack Overflow用户

发布于 2018-01-05 13:08:25

使用1.4.0版本,Flink不再支持从Checkpointed接口获取的状态恢复。要进行有状态升级,必须执行以下操作:

  1. 在Flink 1.2.1上使用作业的保存点
  2. 在所有有状态函数中将Checkpointed替换为CheckpointedFunction
  3. 实现CheckpointedRestoring接口以从Checkpointed保存点恢复
  4. 在Flink 1.2.1上执行修改后的作业,并接受第二个保存点
  5. 从所有有状态函数中删除CheckpointedRestoring接口
  6. 使用Flink 1.4.0上的第二个保存点运行修改过的作业

在迁移你的工作时,让我知道是否还有其他问题。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48098737

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档