首先,这与Kafka consuming the latest message again when I rerun the Flink consumer非常相似,但并不相同。这个问题的答案似乎解决不了我的问题。如果我在答案中遗漏了什么,那么请重新表述答案,因为我显然遗漏了一些东西。
问题是完全一样的,尽管-- Flink (卡夫卡连接器)重新运行它在关闭之前看到的最后3-9条消息。
我的版本
Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91我的密码
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._
object Runner {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testing");
val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
env.addSource(kafkaConsumer)
.addSink(kafkaProducer)
env.execute()
}
}我的SBT属地
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.1.2",
"org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
"org.apache.flink" %% "flink-clients" % "1.1.2",
"org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
"org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)我的过程
(3个终端机)
TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic我的期望
当系统中没有错误时,我希望能够打开或关闭flink,而无需重新处理在先前运行中成功完成流的消息。
我试图修复
我添加了对setStateBackend的调用,认为可能默认的内存后端记错了。这似乎没什么用。
我已经删除了对enableCheckpointing的调用,希望在中可能有一个单独的机制来跟踪状态。这似乎没什么用。
我使用了不同的接收器,RollingFileSink,print();希望bug可能在kafka中。这似乎没什么用。
我回到了flink (以及所有连接器) v1.1.0和v1.1.1,希望这个bug可能在最新版本中。这似乎没什么用。
我已经将zookeeper.connect配置添加到属性对象中,希望关于它仅在0.8中有用的评论是错误的。这似乎没什么用。
我已经显式地将检查点模式设置为EXACTLY_ONCE (好主意drfloob)。这似乎没什么用。
我的请求
帮助!
发布于 2016-09-14 09:37:02
(我在JIRA上发布了同样的回复,只是在这里交叉发布了相同的回复)
根据你的描述,我假设你是在手动关闭这份工作,然后重新提交,对吗?
除非使用保存点(https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html),否则Flink不会完全保留--一次手动重新启动作业。精确一次保证是指作业何时失败,然后自动从以前的检查点恢复自身(当启用检查点时,就像您对env.enableCheckpointing(500)所做的那样)。
实际上,卡夫卡消费者只是在手动重新提交作业时,开始阅读ZK / Kafka中提交的现有抵消项。当你第一次执行这项任务时,这些补偿都是对ZK / Kafka的承诺。然而,它们并不用于Flink的精确一次语义;Flink使用内部校验卡夫卡偏移量。卡夫卡消费者将这些补偿提交给ZK,仅仅是为了向外界披露工作消费的进展情况(wrt Flink)。
发布于 2016-09-14 09:04:13
更新2:我用偏移处理修复了错误,它被合并到当前的主机中。
更新:没有问题,在取消工作之前使用手动保存点(多亏戈登)
我检查了日志,这似乎是偏移处理中的一个错误。我在https://issues.apache.org/jira/browse/FLINK-4618下提交了一份报告。当我得到反馈时,我会更新这个答案。
https://stackoverflow.com/questions/39459315
复制相似问题