首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka 0.9新消费者api -如何观看消费者偏移量

Kafka 0.9新消费者api -如何观看消费者偏移量
EN

Stack Overflow用户
提问于 2016-02-11 16:12:41
回答 2查看 997关注 0票数 1

我正在尝试使用Java API监视给定组的消费者偏移量。我创建了另一个消费者,它不订阅任何主题,只是调用consumer.committed(topic)来获取偏移量信息。这种方式是可行的,但是:

对于测试,我只使用了一个真实的消费者(即订阅了该主题的消费者)。尽管我使用的是poll(1000),但当我使用close()关闭它并稍后重新启动它时,从订阅到第一次消费消息之间需要27秒。

我猜这与再平衡有关,可能被非订阅的消费者搞混了。这有可能吗?有没有更好的方法来使用Java API监控偏移量(我知道命令行工具,但需要使用API)。

EN

回答 2

Stack Overflow用户

发布于 2016-02-12 03:25:51

有不同的方法来检查主题的偏移量,这取决于你想要它的目的,除了你上面描述的“提交”之外,这里还有两个选项:

1)如果你想知道下次线程启动时消费者开始从代理获取数据的偏移量id,那么你必须使用"position“作为

代码语言:javascript
复制
long offsetPosition;
TopicPartition tPartition = new TopicPartition(topic,partitionToReview);
    offsetPosition = kafkaConsumer.position(tPartition);
    System.out.println("offset of the next record to fetch is : " + position);

2)从kafkaConsumer轮询后,从ConsumerRecord对象调用"offset()“方法

代码语言:javascript
复制
Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator();
while(it.hasNext()){
ConsumerRecord<byte[],byte[]> record = it.next();
System.out.println("offset : " + record.offset());
}
票数 1
EN

Stack Overflow用户

发布于 2016-02-12 20:59:50

发现它:监控消费者增加了混乱,但不是罪魁祸首。最后,这很容易理解,尽管有点出乎意料(至少对我来说是这样):

session.timeout.ms的默认值为30秒。当一个消费者消失时,它需要30秒的时间才能被宣布死亡,并重新平衡工作。为了测试,我已经停止了我的单一消费者,等待了三秒钟,并重新启动了一个新的消费者。然后,它花了27秒才开始,填补了30秒的超时。

我曾期望一个单独的用户启动时不会等待超时到期,而是开始“重新平衡”,即立即抢占工作。似乎超时必须在工作重新平衡之前到期,即使只有一个消费者也是如此。

为了更快地通过测试,我更改了配置,以便对使用者使用较低的session.timeout.ms,对代理使用较低的group.min.session.timeout.ms

总而言之:使用不订阅任何主题的使用者来监控偏移效果很好,而且似乎不会干扰重新平衡过程。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35333935

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档