根据kafka confluent issue的说法,当我使用producer.flush()时,它可以工作,但性能很差,但正如我建议的那样,我使用producer.poll(0),但不会向主题生成任何消息,是否需要任何配置,或者我在这里遗漏了什么?
self.producer.produce(topic.value, data.encode('utf-8'), callback=self.delivery_report)
self.producer.poll(0) # -> doesn't work
self.producer.flush() # -> works发布于 2019-12-13 23:50:36
消息不会发送到kafka,因为没有时间这样做。你的应用程序正在提前终止。
这将会起作用:
self.producer.produce(topic.value, data.encode('utf-8'), callback=self.delivery_report)
timer.sleep(1) -- sleep for one seconds.
self.producer.poll(0) Producer有两个缓冲区。第一发送缓冲区,第二响应缓冲区(来自kafka的响应)。
方法produce(...) -正在向发送缓冲区添加新消息。默认情况下,后台线程会尝试尽快发送消息,但它仍然需要一段时间来完成此操作。
方法poll(0) -正在检查响应缓冲区并执行回调方法。如果buffer为空,则不会发生任何事情。
方法flush() -正在检查两个缓冲区,直到所有消息都将被处理并执行回调方法。请在退出应用程序之前使用此方法。
示例用法。
def send(topic,message,callback_report):
producer.produce(topic,message,callback=callback_report)
producer.pool(0) // execute callback for previous messages,
for msg in big_collection_of_messages:
send('blabla',msg,delivery_report)
producer.flush()
//END OF APPLICATION 请注意。这种解释是一个很大的简化。
https://stackoverflow.com/questions/59172333
复制相似问题