我正在编写一个Kafka消费者应用程序,其中我有一个消费者为每个分区。代码如下所示
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e); }});
}是否有一种以编程方式访问和打印使用者滞后偏移量的方法,或者以其他方式表示使用者读取的最后一条记录的偏移量与某个生产者写入该使用者分区的最后一条记录的偏移量之间的位置差异。
我应该添加哪些语句来获得滞后偏移值,同时知道我的最终目标是将该值发送到prometheus进行监视?
发布于 2020-10-26 12:47:54
如果您的目标是在prometheus中获得数据,那么您应该使用消费者发出的records-lag指标。
在文档中的消费者获取计量部分中的最后一个表中,您可以看到使用者在每个主题分区中发出滞后(和引导)。
默认情况下,度量是通过JMX发出的,因此您可以使用普罗米修斯的JMX出口商来完成这项工作。
发布于 2020-10-26 12:48:54
有多种选择,您可以做,以监测您的消费者滞后。
使用KafkaConsumer的endOffsets API
根据KafkaConsumer上的KafkaConsumer,您可以使用endOffsets方法“获取给定分区的结束偏移量”。
使用JMX度量
您可以使用JMX度量来使用Kafka的监视功能。在关于监控卡夫卡的文档中,我们解释了如何获取“消费者滞后于生产者的消息数量,而不是由代理发布的消息数量”。
https://stackoverflow.com/questions/64536722
复制相似问题