首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >重新启动时的Kafka & Flink重复消息

重新启动时的Kafka & Flink重复消息
EN

Stack Overflow用户
提问于 2016-09-12 21:50:11
回答 2查看 2.5K关注 0票数 7

首先,这与Kafka consuming the latest message again when I rerun the Flink consumer非常相似,但并不相同。这个问题的答案似乎解决不了我的问题。如果我在答案中遗漏了什么,那么请重新表述答案,因为我显然遗漏了一些东西。

问题是完全一样的,尽管-- Flink (卡夫卡连接器)重新运行它在关闭之前看到的最后3-9条消息。

我的版本

代码语言:javascript
复制
Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

我的密码

代码语言:javascript
复制
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}

我的SBT属地

代码语言:javascript
复制
libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

我的过程

(3个终端机)

代码语言:javascript
复制
TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

我的期望

当系统中没有错误时,我希望能够打开或关闭flink,而无需重新处理在先前运行中成功完成流的消息。

我试图修复

我添加了对setStateBackend的调用,认为可能默认的内存后端记错了。这似乎没什么用。

我已经删除了对enableCheckpointing的调用,希望在中可能有一个单独的机制来跟踪状态。这似乎没什么用。

我使用了不同的接收器,RollingFileSink,print();希望bug可能在kafka中。这似乎没什么用。

我回到了flink (以及所有连接器) v1.1.0和v1.1.1,希望这个bug可能在最新版本中。这似乎没什么用。

我已经将zookeeper.connect配置添加到属性对象中,希望关于它仅在0.8中有用的评论是错误的。这似乎没什么用。

我已经显式地将检查点模式设置为EXACTLY_ONCE (好主意drfloob)。这似乎没什么用。

我的请求

帮助!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-09-14 09:37:02

(我在JIRA上发布了同样的回复,只是在这里交叉发布了相同的回复)

根据你的描述,我假设你是在手动关闭这份工作,然后重新提交,对吗?

除非使用保存点(https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html),否则Flink不会完全保留--一次手动重新启动作业。精确一次保证是指作业何时失败,然后自动从以前的检查点恢复自身(当启用检查点时,就像您对env.enableCheckpointing(500)所做的那样)。

实际上,卡夫卡消费者只是在手动重新提交作业时,开始阅读ZK / Kafka中提交的现有抵消项。当你第一次执行这项任务时,这些补偿都是对ZK / Kafka的承诺。然而,它们并不用于Flink的精确一次语义;Flink使用内部校验卡夫卡偏移量。卡夫卡消费者将这些补偿提交给ZK,仅仅是为了向外界披露工作消费的进展情况(wrt Flink)。

票数 10
EN

Stack Overflow用户

发布于 2016-09-14 09:04:13

更新2:我用偏移处理修复了错误,它被合并到当前的主机中。

更新:没有问题,在取消工作之前使用手动保存点(多亏戈登)

我检查了日志,这似乎是偏移处理中的一个错误。我在https://issues.apache.org/jira/browse/FLINK-4618下提交了一份报告。当我得到反馈时,我会更新这个答案。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39459315

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档