我有一条简单的管道
env.addSource(kafkaConsumer).uid("kafka-src").name(consumerName)
.keyBy(_.id)
.process(new Processor).uid("processor")
.addSink(kafkaProducer).name(producerName)现在,我尝试将uid简单地添加到水槽中,如下所示
env.addSource(kafkaConsumer).uid("kafka-src").name(consumerName)
.keyBy(_.id)
.process(new Processor).uid("processor")
.addSink(kafkaProducer).name(producerName).uid("kafka-sink")但我得到了一个很长的例外,这似乎是信息的一部分:
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/tmp/rocksdb/savepoint-445173-011657873d74. Cannot map checkpoint/savepoint state for operator 3cfeb06db0484d5556a7de8db2025f09 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1103)
at org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1175)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:299)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:83)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:37)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)这说得通吗?是否有办法在不丢失保存点的情况下解决它?
发布于 2019-07-10 19:25:00
这个问题是有意义的,因为如果您不手动指定ID,它们将自动生成。可能生成的ID是3cfeb06db0484d5556a7de8db2025f09。
你有三个选择:
3cfeb06db0484d5556a7de8db2025f09作为运算符的uid。以下是一些可能对您有帮助的链接:
https://stackoverflow.com/questions/56977241
复制相似问题