我将"node-rdkafka“npm模块用于我们用Nodejs编写的分布式服务架构。我们有一个计量的用例,在这个用例中,我们只允许每n秒消耗和处理一定数量的消息。例如,“主”主题有100条消息由生产者推送,“工人”每30秒消费一次主主题。这个用例的故事还有更多的内容。
我遇到的问题是,我需要程序化地获取给定主题(所有分区)的滞后。
有没有办法让我做到这一点?
我知道我可以使用"bin/kafka-consumer-groups.sh“来访问一些我需要的数据,但是有没有其他方法呢?
提前谢谢你
发布于 2018-04-09 21:36:06
您可以通过以下几种方法直接从node-rdkafka客户端检索该信息:
客户端可以按定义的时间间隔发出指标,这些指标包含当前和提交的偏移量以及结束偏移量,因此您可以轻松地计算延迟。
您首先需要通过在客户端配置中设置例如'statistics.interval.ms': 5000来启用指标事件。然后在event.stats事件上设置一个监听器:
consumer.on('event.stats',function(stats) { console.log(stats);});
完整的统计信息记录在https://github.com/edenhill/librdkafka/wiki/Statistics上,但您最感兴趣的可能是分区统计信息:https://github.com/edenhill/librdkafka/wiki/Statistics#partitions
您可以使用queryWatermarkOffsets()检索分区的第一个和最后一个偏移量。
consumer.queryWatermarkOffsets(topicName,partition,timeout,function(err,offsets) { var high = offsets.highOffset;var low = offsets.lowOffset;});
然后使用消费者的当前位置(position())或承诺(committed())偏移量来计算滞后。
发布于 2018-04-09 16:25:50
Kafka通过jmx公开" records - lag - max“mbean,这是一个分区的滞后中的最大记录数,因此您可以通过jmx查询此mbean的滞后。
有关暴露的jmx mbean的详细信息,请参阅下面的文档。https://docs.confluent.io/current/kafka/monitoring.html#consumer-group-metrics
https://stackoverflow.com/questions/49727191
复制相似问题