首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用kafka-python统计主题中的记录(消息)数量

如何使用kafka-python统计主题中的记录(消息)数量
EN

Stack Overflow用户
提问于 2020-10-07 23:46:18
回答 3查看 2.5K关注 0票数 3

正如标题中所说,我想在我的主题中获得一些记录,但我找不到使用kafka-python库的解决方案。有谁知道吗?

EN

回答 3

Stack Overflow用户

发布于 2020-10-07 23:53:34

没有特定的API来统计来自某个主题的记录数量。您需要消费并统计您从kafka消费者那里收到的记录数量。

票数 2
EN

Stack Overflow用户

发布于 2020-10-08 00:00:26

一种解决方案是,您可以向所有分区分别添加一条消息,并获得最后一个偏移量。根据偏移量,您可以计算到目前为止发送到主题的总消息数。

但这不是正确的方法。您不知道消费者已经消费了多少条消息,kafka删除了多少条消息。唯一的方法是你可以消费消息并计算数量。

票数 1
EN

Stack Overflow用户

发布于 2021-10-19 23:46:48

我不能在kafka-python中做到这一点,但是我可以用confluent-kafka库很容易地做到:

代码语言:javascript
复制
from confluent_kafka import Consumer

topic = "test_topic"
broker = "localhost:9092"

def get_count():
    consumer = Consumer({
        'bootstrap.servers': broker,
        'group.id': 'my-group',
        'auto.offset.reset': 'earliest',
    })

    consumer.subscribe([topic])

    total_message_count = 0
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            print("No more messages")
            break
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        total_message_count = total_message_count + 1
        print('Received message {}: {}'.format(total_message_count,     
msg.value().decode('utf-8')))

    consumer.close()

    print(total_message_count)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64247670

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档