首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >python同时使用KafkaConsumer和Producer

python同时使用KafkaConsumer和Producer
EN

Stack Overflow用户
提问于 2019-09-16 22:35:16
回答 1查看 613关注 0票数 0

在python程序中,我想向Kafka写一些消息,然后读取来自不同主题的远程应用程序的相同数量的消息。问题是,当我完成发送消息时,另一端已经开始响应,当我开始阅读时,我只能获得消息批的尾部部分,或者根本没有消息,这取决于时间。这与我对包的理解相矛盾,即我认为如果我使用auto_offset_reset='latest'创建一个消费者,并订阅一个主题,那么它会在订阅时记住偏移量,当我迭代消费者对象时,它将开始从该偏移量读取消息。

下面是我要做的:

我首先创建了一个消费者并订阅了out主题:

代码语言:javascript
复制
consumer = KafkaConsumer(
  bootstrap_servers=host+':'+broker_port, 
  group_id = "0", 
  auto_offset_reset='latest',
  consumer_timeout_ms=10000
)
consumer.subscribe(topics=(topic_out))

然后创建一个生产者并向topic_in发送消息:

代码语言:javascript
复制
producer = KafkaProducer(
   bootstrap_servers=host+':'+broker_port
)
future = producer.send(topic,json.dumps(record).encode('utf-8'))
future.get(timeout=5)

然后我开始阅读来自消费者的信息:

代码语言:javascript
复制
results = []
for msg in consumer:
   message = json.loads(msg.value)
   results.append(message)

我在发送之前尝试了consumer.pause(),发送之后尝试了consumer.resume() -没有帮助。

我是不是在配置中遗漏了什么,或者我误解了Consumer的工作方式?

提前感谢!

EN

回答 1

Stack Overflow用户

发布于 2019-12-31 22:51:25

听起来你有种族问题。

一种解决方案是存储一个本地字典或sqlite表,该本地字典或sqlite表是通过使用这个“查找主题”构建的,然后当您使用“操作主题”时,您是在本地进行查找,而不是启动一个使用者来扫描“查找主题”以查找所需数据。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57959105

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档