使用kafka-python向主题发送大量消息。成功地将部分消息发送到主题,但并非所有消息都是在程序以下列错误消息终止之前发送的:
KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290>
Batch is already closed -- ignoring batch.done()
Error processing errback
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs
f(value)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce
self._complete_batch(batch, error, -1, None)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch
self._accumulator.deallocate(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate
self._incomplete.remove(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove
return self._incomplete.remove(batch)在我的主题中,每次运行都会收到不同数量的消息。问题似乎是,卡夫卡producer.send的电话没有完成发送之前,该程序达到其结束。
根据kafka文档,producer.send是一种异步方法,这可能是根本原因--并非所有异步线程都在进程终止前完成发送:
send()方法是异步的。调用时,它将记录添加到挂起的记录的缓冲区中,发送并立即返回。这允许生产者对单个记录进行批次,以提高效率。
有许多简单的解决方案(例如将batch.size设置为低数量)可能会导致性能瓶颈。
如何在不影响性能的情况下解决这个问题?
发布于 2017-11-15 16:03:48
出口前打电话给producer.flush()。
https://stackoverflow.com/questions/46672837
复制相似问题