首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka新消费者:(重新)使用赋值,而不是订阅来设置和提交偏移量

Kafka新消费者:(重新)使用赋值,而不是订阅来设置和提交偏移量
EN

Stack Overflow用户
提问于 2016-04-13 01:09:14
回答 2查看 4.2K关注 0票数 3

使用新的Kafka Java使用者api,我运行单个使用者来消费消息。当所有可用消息都被使用完后,我用kill -15终止它。

现在我想重置偏移量开始。我想避免仅仅使用不同的消费者群体。我尝试的是下面的调用序列,使用与刚刚读取完数据的消费者相同的组。

代码语言:javascript
复制
assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));

我以为我已经在测试中完成了这项工作,但现在我总是得到:

代码语言:javascript
复制
ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)

assigncommitSync结合起来在原则上是错误的吗,可能是因为只有subscribecommitSync可以一起使用?文档只说assign不支持subscribe,但我认为这只适用于一个消费者进程。(事实上,我甚至希望在另一个消费者启动时运行偏移量重置消费者,希望另一个消费者可以注意到偏移量的变化并重新开始。但先将其关闭也没问题。)

有什么想法吗?

EN

回答 2

Stack Overflow用户

发布于 2016-04-18 18:55:31

找到了问题所在。如果我们遵守以下条件,我的问题中描述的方法可以很好地工作:

  1. 可能没有其他使用者正在运行目标group.id。即使使用者只订阅了其他主题,这也会阻碍在调用subscribe().

而不是assign()之后提交主题偏移量

  1. 在最后一个消费者停止后,需要30秒(我认为是group.max.session.timeout.ms)才能成功。来自kafka的指示性日志消息是

组X世代Y已死,已删除

一旦这一点出现在日志中,序列

代码语言:javascript
复制
assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));

才能成功。

票数 3
EN

Stack Overflow用户

发布于 2016-04-13 01:51:05

为什么一开始还要提交偏移量?在Properties中将enable.auto.commit设置为false,如果您只是在重启时重新读取所有消息,则根本不要提交它。

要重置偏移量,可以使用例如these methods

代码语言:javascript
复制
public void seek(TopicPartition partition, long offset)
public void seekToBeginning(TopicPartition... partitions)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36579827

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档