首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink + Kafka:为什么我会丢失消息?

Flink + Kafka:为什么我会丢失消息?
EN

Stack Overflow用户
提问于 2015-11-03 22:27:00
回答 2查看 2.8K关注 0票数 2

我已经写了一个非常简单的Flink流作业,从卡夫卡使用FlinkKafkaConsumer082的数据。

代码语言:javascript
复制
protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
    Properties result = new Properties();
    result.put("bootstrap.servers", getBrokerUrl());
    result.put("zookeeper.connect", getZookeeperUrl());
    result.put("group.id", getGroup());

        return env.addSource(
                new FlinkKafkaConsumer082<>(
                        topic,
                        new SimpleStringSchema(), result);
}

这非常有效,每当我在Kafka上的主题中放入一些东西时,它就会被我的Flink工作接收并处理。现在我试着看看如果我的Flink Job由于某种原因没有在线会发生什么。因此,我关闭了flink作业,并继续向Kafka发送消息。然后我再次开始我的Flink工作,并期望它能处理同时发送的消息。

然而,我得到了这样的信息:

代码语言:javascript
复制
No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]

因此,它基本上忽略了自上次关闭Flink作业以来收到的所有消息,只是在队列末尾开始读取。从我收集的FlinkKafkaConsumer082文档中可以看出,它自动负责与Kafka代理同步处理的偏移量。然而,情况似乎并非如此。

我使用的是单节点Kafka安装( Kafka发行版附带的安装)和单节点Zookeper安装(也是Kafka发行版捆绑的安装)。

我怀疑它是某种配置错误或类似的东西,但我真的不知道从哪里开始查找。还有没有其他人遇到过这个问题,也许已经解决了?

EN

回答 2

Stack Overflow用户

发布于 2015-11-05 16:28:27

我找到原因了。您需要在StreamExecutionEnvironment中显式地启用检查点,以使Kafka连接器将处理后的偏移量写入Zookeeper。如果您不启用它,Kafka连接器将不会写入最后读取的偏移量,因此当收集作业重新启动时,它将无法从那里恢复。所以一定要这样写:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(); // <-- this is the important part

Anatoly关于更改初始偏移量的建议可能仍然是一个好主意,以防检查点由于某种原因而失败。

票数 4
EN

Stack Overflow用户

发布于 2015-11-03 22:57:06

https://kafka.apache.org/08/configuration.html

将auto.offset.reset设置为最小(默认情况下为最大)

auto.offset.reset

当Zookeeper中没有初始偏移量或者偏移量超出范围时该怎么办:

最小:自动将偏移量重置为最小偏移量

最大:自动将偏移量重置为最大偏移量

其他:向消费者抛出异常。

如果将其设置为最大,则当其订阅的主题的分区数量在代理上发生变化时,使用者可能会丢失一些消息。为防止在添加分区时丢失数据,请将auto.offset.reset设置为最小

还要确保getGroup()在重启后是相同的

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33501574

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档