我已经写了一个非常简单的Flink流作业,从卡夫卡使用FlinkKafkaConsumer082的数据。
protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
Properties result = new Properties();
result.put("bootstrap.servers", getBrokerUrl());
result.put("zookeeper.connect", getZookeeperUrl());
result.put("group.id", getGroup());
return env.addSource(
new FlinkKafkaConsumer082<>(
topic,
new SimpleStringSchema(), result);
}这非常有效,每当我在Kafka上的主题中放入一些东西时,它就会被我的Flink工作接收并处理。现在我试着看看如果我的Flink Job由于某种原因没有在线会发生什么。因此,我关闭了flink作业,并继续向Kafka发送消息。然后我再次开始我的Flink工作,并期望它能处理同时发送的消息。
然而,我得到了这样的信息:
No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]因此,它基本上忽略了自上次关闭Flink作业以来收到的所有消息,只是在队列末尾开始读取。从我收集的FlinkKafkaConsumer082文档中可以看出,它自动负责与Kafka代理同步处理的偏移量。然而,情况似乎并非如此。
我使用的是单节点Kafka安装( Kafka发行版附带的安装)和单节点Zookeper安装(也是Kafka发行版捆绑的安装)。
我怀疑它是某种配置错误或类似的东西,但我真的不知道从哪里开始查找。还有没有其他人遇到过这个问题,也许已经解决了?
发布于 2015-11-05 16:28:27
我找到原因了。您需要在StreamExecutionEnvironment中显式地启用检查点,以使Kafka连接器将处理后的偏移量写入Zookeeper。如果您不启用它,Kafka连接器将不会写入最后读取的偏移量,因此当收集作业重新启动时,它将无法从那里恢复。所以一定要这样写:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(); // <-- this is the important partAnatoly关于更改初始偏移量的建议可能仍然是一个好主意,以防检查点由于某种原因而失败。
发布于 2015-11-03 22:57:06
https://kafka.apache.org/08/configuration.html
将auto.offset.reset设置为最小(默认情况下为最大)
auto.offset.reset
当Zookeeper中没有初始偏移量或者偏移量超出范围时该怎么办:
最小:自动将偏移量重置为最小偏移量
最大:自动将偏移量重置为最大偏移量
其他:向消费者抛出异常。
如果将其设置为最大,则当其订阅的主题的分区数量在代理上发生变化时,使用者可能会丢失一些消息。为防止在添加分区时丢失数据,请将auto.offset.reset设置为最小
还要确保getGroup()在重启后是相同的
https://stackoverflow.com/questions/33501574
复制相似问题