消费者代码如下:
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
kafka = KafkaClient("localhost", 9092)
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
consumer.seek(0, 2)
for message in consumer:
print message
kafka.close()然后我用脚本生成消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic问题是,当我将使用者作为两个不同的进程启动时,我会在每个流程中接收新的消息。然而,我希望它只发送给一个消费者,而不是广播。
在卡夫卡(https://kafka.apache.org/documentation.html)的文档中写着:
如果所有的使用者实例都有相同的使用者组,那么这就像传统的队列平衡负载在使用者身上一样。
我看到,对于这些消费者来说,这个群体是相同的--my。
怎样才能使新的信息被一个消费者读取,而不是通过广播呢?
发布于 2014-03-22 01:18:32
消费者群体API直到kafka诉0.8.1 (2014年3月12日发布)才得到官方的支持。对于以前的服务器版本,使用者组不能正常工作。在这篇文章中,kafka-python库目前并不试图发送组偏移量数据:
发布于 2014-02-20 18:45:12
很难从上面的例子中看出你的动物园管理员配置是什么,或者是否有。您将需要一个集群来保存消费者组信息,WRT每个组中的使用者已经消耗到给定的偏移量。
下面是一个很好的例子:卡夫卡官方文档-消费者团体的例子
发布于 2014-03-27 03:40:08
这种情况不应该发生--确保两个消费者都是在动物园管理员znodes中的同一个消费者组下注册的。每个主题的每条消息都应该由一个消费群体使用一次,所以小组中的每个人中应该有一个收到消息,而不是您正在体验的信息。你用的是什么版本的卡夫卡?
https://stackoverflow.com/questions/21809636
复制相似问题