使用新的Kafka Java使用者api,我运行单个使用者来消费消息。当所有可用消息都被使用完后,我用kill -15终止它。
现在我想重置偏移量开始。我想避免仅仅使用不同的消费者群体。我尝试的是下面的调用序列,使用与刚刚读取完数据的消费者相同的组。
assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));我以为我已经在测试中完成了这项工作,但现在我总是得到:
ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)将assign和commitSync结合起来在原则上是错误的吗,可能是因为只有subscribe和commitSync可以一起使用?文档只说assign不支持subscribe,但我认为这只适用于一个消费者进程。(事实上,我甚至希望在另一个消费者启动时运行偏移量重置消费者,希望另一个消费者可以注意到偏移量的变化并重新开始。但先将其关闭也没问题。)
有什么想法吗?
发布于 2016-04-18 18:55:31
找到了问题所在。如果我们遵守以下条件,我的问题中描述的方法可以很好地工作:
group.id。即使使用者只订阅了其他主题,这也会阻碍在调用subscribe().而不是assign()之后提交主题偏移量
group.max.session.timeout.ms)才能成功。来自kafka的指示性日志消息是组X世代Y已死,已删除
一旦这一点出现在日志中,序列
assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));才能成功。
发布于 2016-04-13 01:51:05
为什么一开始还要提交偏移量?在Properties中将enable.auto.commit设置为false,如果您只是在重启时重新读取所有消息,则根本不要提交它。
要重置偏移量,可以使用例如these methods
public void seek(TopicPartition partition, long offset)
public void seekToBeginning(TopicPartition... partitions)https://stackoverflow.com/questions/36579827
复制相似问题