我正在尝试使用Java API监视给定组的消费者偏移量。我创建了另一个消费者,它不订阅任何主题,只是调用consumer.committed(topic)来获取偏移量信息。这种方式是可行的,但是:
对于测试,我只使用了一个真实的消费者(即订阅了该主题的消费者)。尽管我使用的是poll(1000),但当我使用close()关闭它并稍后重新启动它时,从订阅到第一次消费消息之间需要27秒。
我猜这与再平衡有关,可能被非订阅的消费者搞混了。这有可能吗?有没有更好的方法来使用Java API监控偏移量(我知道命令行工具,但需要使用API)。
发布于 2016-02-12 03:25:51
有不同的方法来检查主题的偏移量,这取决于你想要它的目的,除了你上面描述的“提交”之外,这里还有两个选项:
1)如果你想知道下次线程启动时消费者开始从代理获取数据的偏移量id,那么你必须使用"position“作为
long offsetPosition;
TopicPartition tPartition = new TopicPartition(topic,partitionToReview);
offsetPosition = kafkaConsumer.position(tPartition);
System.out.println("offset of the next record to fetch is : " + position);2)从kafkaConsumer轮询后,从ConsumerRecord对象调用"offset()“方法
Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator();
while(it.hasNext()){
ConsumerRecord<byte[],byte[]> record = it.next();
System.out.println("offset : " + record.offset());
}发布于 2016-02-12 20:59:50
发现它:监控消费者增加了混乱,但不是罪魁祸首。最后,这很容易理解,尽管有点出乎意料(至少对我来说是这样):
session.timeout.ms的默认值为30秒。当一个消费者消失时,它需要30秒的时间才能被宣布死亡,并重新平衡工作。为了测试,我已经停止了我的单一消费者,等待了三秒钟,并重新启动了一个新的消费者。然后,它花了27秒才开始,填补了30秒的超时。
我曾期望一个单独的用户启动时不会等待超时到期,而是开始“重新平衡”,即立即抢占工作。似乎超时必须在工作重新平衡之前到期,即使只有一个消费者也是如此。
为了更快地通过测试,我更改了配置,以便对使用者使用较低的session.timeout.ms,对代理使用较低的group.min.session.timeout.ms。
总而言之:使用不订阅任何主题的使用者来监控偏移效果很好,而且似乎不会干扰重新平衡过程。
https://stackoverflow.com/questions/35333935
复制相似问题