我有一个星星之火的结构化流应用程序从kafka消费,对于这个应用程序,我想监测消费者的滞后。我使用下面的命令来检查用户延迟。但是,我没有得到当前的偏移量,因此滞后也是空白的。这是预期的吗?它适用于其他基于python的消费者。
命令
kafka-consumer-groups --bootstrap-server <bootstrap-server>:<port> --describe --all-groups输出
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
spark-kafka-source-b5e8d872-f727-4ed0-a82c-a3d279647942-407459747-driver-0 my_topic 21 - 5546 - consumer-3-bc651181-fc62-4b1a-abdf-fb3e9d244df8 /<ip-address> consumer-3
spark-kafka-source-b5e8d872-f727-4ed0-a82c-a3d279647942-407459747-driver-0 my_topic 7 - 5129 - consumer-3-bc651181-fc62-4b1a-abdf-fb3e9d244df8 /<ip-address> consumer-3
spark-kafka-source-b5e8d872-f727-4ed0-a82c-a3d279647942-407459747-driver-0 my_topic 3 - 5178 - consumer-3-bc651181-fc62-4b1a-abdf-fb3e9d244df8 /<ip-address> consumer-3
spark-kafka-source-b5e8d872-f727-4ed0-a82c-a3d279647942-407459747-driver-0 my_topic 9 - 4969 - consumer-3-bc651181-fc62-4b1a-abdf-fb3e9d244df8 /<ip-address> consumer-3
spark-kafka-source-b5e8d872-f727-4ed0-a82c-a3d279647942-407459747-driver-0 my_topic 2 - 5443 - consumer-3-bc651181-fc62-4b1a-abdf-fb3e9d244df8 /<ip-address> consumer-3
spark-kafka-source-b5e8d872-f727-4ed0-a82c-a3d279647942-407459747-driver-0 my_topic 15 - 5312 - consumer-3-bc651181-fc62-4b1a-abdf-fb3e9d244df8 /<ip-address> consumer-3发布于 2021-01-22 16:55:47
“然而,我没有得到当前的偏移量,因此滞后也是空白的。这是预期的吗?”
是的,这是预期的行为,因为星火结构化的流媒体应用程序没有向Kafka提交任何补偿。因此,当前的偏移量和该消费群体的滞后将不会存储在Kafka中,您将看到您所展示的消费者组工具的确切结果。
我写了一个更全面的答案,关于消费集团和星火结构流应用如何管理卡夫卡抵消here。
https://stackoverflow.com/questions/65847816
复制相似问题