首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >producer.poll(0)不会产生任何消息,但producer.flush()可以工作

producer.poll(0)不会产生任何消息,但producer.flush()可以工作
EN

Stack Overflow用户
提问于 2019-12-04 17:05:43
回答 1查看 1.6K关注 0票数 1

根据kafka confluent issue的说法,当我使用producer.flush()时,它可以工作,但性能很差,但正如我建议的那样,我使用producer.poll(0),但不会向主题生成任何消息,是否需要任何配置,或者我在这里遗漏了什么?

代码语言:javascript
复制
self.producer.produce(topic.value, data.encode('utf-8'), callback=self.delivery_report)

self.producer.poll(0)  # -> doesn't work
self.producer.flush()  # -> works
EN

回答 1

Stack Overflow用户

发布于 2019-12-13 23:50:36

消息不会发送到kafka,因为没有时间这样做。你的应用程序正在提前终止。

这将会起作用:

代码语言:javascript
复制
self.producer.produce(topic.value, data.encode('utf-8'), callback=self.delivery_report)
timer.sleep(1) -- sleep for one seconds. 
self.producer.poll(0) 

Producer有两个缓冲区。第一发送缓冲区,第二响应缓冲区(来自kafka的响应)。

方法produce(...) -正在向发送缓冲区添加新消息。默认情况下,后台线程会尝试尽快发送消息,但它仍然需要一段时间来完成此操作。

方法poll(0) -正在检查响应缓冲区并执行回调方法。如果buffer为空,则不会发生任何事情。

方法flush() -正在检查两个缓冲区,直到所有消息都将被处理并执行回调方法。请在退出应用程序之前使用此方法。

示例用法。

代码语言:javascript
复制
def send(topic,message,callback_report):
   producer.produce(topic,message,callback=callback_report)
   producer.pool(0) // execute callback for previous messages,  

for msg in big_collection_of_messages:
  send('blabla',msg,delivery_report)


producer.flush()
//END OF APPLICATION 

请注意。这种解释是一个很大的简化。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59172333

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档