我是Kafka的新手,我已经尝试过Kafka-Python包。
我设法设置了一个简单的生产者和消费者,它可以发送和接收消息。在这种情况下,消费者没有使用消费组,如下所示:
consumer = KafkaConsumer(queue_name, bootstrap_servers='kafka:9092')但是,当我开始使用group_id时,如下所示,它停止接收任何消息:
consumer = KafkaConsumer(bootstrap_servers='kafka:9092', auto_offset_reset='earliest', group_id='my-group')
consumer.subscribe([queue_name])为了进行比较,我还尝试了confluent-kafka-python包,其中我有以下消费者代码,它也不起作用:
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
consumer.subscribe([queue_name])同样运行./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list会得到空的结果。
这里有我遗漏的配置吗?
发布于 2019-03-19 17:17:21
默认情况下,使用者从上次提交的偏移量开始消费,这可能是您的情况下的最后一个偏移量。
仅当没有承诺的偏移时,auto.offset.reset才适用。因为默认情况下消费者会自动提交偏移量,它通常只在你第一次运行它的时候应用(还有其他几种情况,但在本例中无关紧要)。
因此,要查看消息流,您需要在消费者运行后立即开始生产,或者使用不同的组名以允许应用auto.offset.reset。
https://stackoverflow.com/questions/55236577
复制相似问题