我们的使用是,我们希望使用flink流的去复制作业,它从源读取它的数据(kafka主题),并将独特的记录写入hdfs文件接收器。Kafka主题可能有重复的数据,可以使用复合键(adserver_id,unix_timestamp of the record)来识别这些数据。
因此,我决定使用flink键状态流来实现去复制。
val messageStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
messageStream
.map{
record =>
val key = record.adserver_id.get + record.event_timestamp.get
(key,record)
}
.keyBy(_._1)
.flatMap(new DedupDCNRecord())
.map(_.toString)
.addSink(sink)
// execute the stream
env.execute(applicationName)
}下面是使用flink的值状态进行去复制的代码。
class DedupDCNRecord extends RichFlatMapFunction[(String, DCNRecord), DCNRecord] {
private var operatorState: ValueState[String] = null
override def open(configuration: Configuration) = {
operatorState = getRuntimeContext.getState(
DedupDCNRecord.descriptor
)
}
@throws[Exception]
override def flatMap(value: (String,DCNRecord), out: Collector[DCNRecord]): Unit = {
if (operatorState.value == null) { // we haven't seen the element yet
out.collect(value._2)
// set operator state to true so that we don't emit elements with this key again
operatorState.update(value._1)
}
}
}虽然这种方法工作良好,只要流作业通过valueState运行和维护唯一键列表并执行去复制操作。但是,一旦我取消了作业,flink就会为valueState(只保留当前运行的唯一键)松开它的状态(在前一次作业运行中看到的唯一键),并让记录通过,这些记录在作业的前一次运行中已经处理过了。有什么方法,我们可以强制flink来维护--它是valueState(unique_keys) --到目前为止?感谢你的帮助。
发布于 2022-11-06 10:52:08
这要求在关闭作业之前捕获状态快照,然后从该快照重新启动:
有关一步一步的教程,请参见提升和重新分配工作中的Flink操作游乐场。在这里,关于观察故障与恢复的部分也是相关的。
https://stackoverflow.com/questions/74332041
复制相似问题