正如标题中所说,我想在我的主题中获得一些记录,但我找不到使用kafka-python库的解决方案。有谁知道吗?
发布于 2020-10-07 23:53:34
没有特定的API来统计来自某个主题的记录数量。您需要消费并统计您从kafka消费者那里收到的记录数量。
发布于 2020-10-08 00:00:26
一种解决方案是,您可以向所有分区分别添加一条消息,并获得最后一个偏移量。根据偏移量,您可以计算到目前为止发送到主题的总消息数。
但这不是正确的方法。您不知道消费者已经消费了多少条消息,kafka删除了多少条消息。唯一的方法是你可以消费消息并计算数量。
发布于 2021-10-19 23:46:48
我不能在kafka-python中做到这一点,但是我可以用confluent-kafka库很容易地做到:
from confluent_kafka import Consumer
topic = "test_topic"
broker = "localhost:9092"
def get_count():
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
})
consumer.subscribe([topic])
total_message_count = 0
while True:
msg = consumer.poll(1.0)
if msg is None:
print("No more messages")
break
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
total_message_count = total_message_count + 1
print('Received message {}: {}'.format(total_message_count,
msg.value().decode('utf-8')))
consumer.close()
print(total_message_count)https://stackoverflow.com/questions/64247670
复制相似问题