首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >flink流作业是否在作业运行之间保持其关键值状态?

flink流作业是否在作业运行之间保持其关键值状态?
EN

Stack Overflow用户
提问于 2022-11-05 22:53:36
回答 1查看 51关注 0票数 0

我们的使用是,我们希望使用flink流的去复制作业,它从源读取它的数据(kafka主题),并将独特的记录写入hdfs文件接收器。Kafka主题可能有重复的数据,可以使用复合键(adserver_id,unix_timestamp of the record)来识别这些数据。

因此,我决定使用flink键状态流来实现去复制。

代码语言:javascript
复制
val messageStream: DataStream[String] = env.addSource(flinkKafkaConsumer)

messageStream
  .map{
    record =>
      val key = record.adserver_id.get + record.event_timestamp.get
      (key,record)
  }
  .keyBy(_._1)
  .flatMap(new DedupDCNRecord())
  .map(_.toString)
  .addSink(sink)

  // execute the stream
  env.execute(applicationName)
}

下面是使用flink的值状态进行去复制的代码。

代码语言:javascript
复制
class DedupDCNRecord extends RichFlatMapFunction[(String, DCNRecord), DCNRecord] {
  private var operatorState: ValueState[String] = null

  override def open(configuration: Configuration) = {
    operatorState = getRuntimeContext.getState(
      DedupDCNRecord.descriptor
    )
  }

  @throws[Exception]
  override def flatMap(value: (String,DCNRecord), out: Collector[DCNRecord]): Unit = {

    if (operatorState.value == null) { // we haven't seen the element yet
      out.collect(value._2)
      // set operator state to true so that we don't emit elements with this key again
      operatorState.update(value._1)
    }
  }
}

虽然这种方法工作良好,只要流作业通过valueState运行和维护唯一键列表并执行去复制操作。但是,一旦我取消了作业,flink就会为valueState(只保留当前运行的唯一键)松开它的状态(在前一次作业运行中看到的唯一键),并让记录通过,这些记录在作业的前一次运行中已经处理过了。有什么方法,我们可以强制flink来维护--它是valueState(unique_keys) --到目前为止?感谢你的帮助。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-06 10:52:08

这要求在关闭作业之前捕获状态快照,然后从该快照重新启动:

  1. 在拍摄状态快照的同时,执行停止使用保存点以降低当前作业。
  2. 重新发射,使用保存点作为起点

有关一步一步的教程,请参见提升和重新分配工作中的Flink操作游乐场。在这里,关于观察故障与恢复的部分也是相关的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74332041

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档