在python程序中,我想向Kafka写一些消息,然后读取来自不同主题的远程应用程序的相同数量的消息。问题是,当我完成发送消息时,另一端已经开始响应,当我开始阅读时,我只能获得消息批的尾部部分,或者根本没有消息,这取决于时间。这与我对包的理解相矛盾,即我认为如果我使用auto_offset_reset='latest'创建一个消费者,并订阅一个主题,那么它会在订阅时记住偏移量,当我迭代消费者对象时,它将开始从该偏移量读取消息。
下面是我要做的:
我首先创建了一个消费者并订阅了out主题:
consumer = KafkaConsumer(
bootstrap_servers=host+':'+broker_port,
group_id = "0",
auto_offset_reset='latest',
consumer_timeout_ms=10000
)
consumer.subscribe(topics=(topic_out))然后创建一个生产者并向topic_in发送消息:
producer = KafkaProducer(
bootstrap_servers=host+':'+broker_port
)
future = producer.send(topic,json.dumps(record).encode('utf-8'))
future.get(timeout=5)然后我开始阅读来自消费者的信息:
results = []
for msg in consumer:
message = json.loads(msg.value)
results.append(message)我在发送之前尝试了consumer.pause(),发送之后尝试了consumer.resume() -没有帮助。
我是不是在配置中遗漏了什么,或者我误解了Consumer的工作方式?
提前感谢!
发布于 2019-12-31 22:51:25
听起来你有种族问题。
一种解决方案是存储一个本地字典或sqlite表,该本地字典或sqlite表是通过使用这个“查找主题”构建的,然后当您使用“操作主题”时,您是在本地进行查找,而不是启动一个使用者来扫描“查找主题”以查找所需数据。
https://stackoverflow.com/questions/57959105
复制相似问题