首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法使用Flink Processor API恢复检查点状态

无法使用Flink Processor API恢复检查点状态
EN

Stack Overflow用户
提问于 2020-10-27 12:00:10
回答 1查看 134关注 0票数 0

主程序是消费kafka事件,然后过滤-> map -> keyBy -> -> sink。我编写了另一个单独的简单程序来读取检查点目录,如下所示:

代码语言:javascript
复制
object StateReader extends App {

  val path = "file://...."

  val env = ExecutionEnvironment.getExecutionEnvironment

  val chk = Savepoint.load(env.getJavaEnv, path, new FsStateBackend(path))

  val ds = chk.readKeyedState("cep", new CepOperatorReadFunction, TypeInformation.of(classOf[KEY]), TypeInformation.of(classOf[VALUE]))
  println(ds.count())

}

class CepOperatorReadFunction extends KeyedStateReaderFunction[KEY, VALUE] {
  override def open(parameters: Configuration): Unit = {

  }
  override def readKey(k: KEY, context: KeyedStateReaderFunction.Context, collector: Collector[VALUE]): Unit = {

  }//end readKey
}//end class CepOperatorReadFunction

然而,我得到了以下异常:

代码语言:javascript
复制
Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:120)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
    ... 13 more

以下是flink-conf.yaml中的一些配置

代码语言:javascript
复制
state.backend: rocksdb
state.checkpoints.dir: hdfs:///.../checkpoints
state.savepoints.dir: hdfs:///.../savepoints
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.localdir: /var/lib/.../rocksdb
execution.checkpointing.interval: 900000
execution.checkpointing.timeout: 600000
execution.checkpointing.unaligned: true
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0

你知道为什么会发生异常以及如何解决这个问题吗?

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-10-27 23:15:53

没有开箱即用的支持,无法轻松读取CEP操作员的状态。因此,要实现您的KeyedStateReaderFunction,您必须深入研究CEP实现,找到使用的ValueStateMapState,并实现一个使用这些状态描述符的读取器。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64548139

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档