首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KeyError: kafka.producer.record_accumulator.RecordBatch

KeyError: kafka.producer.record_accumulator.RecordBatch
EN

Stack Overflow用户
提问于 2017-10-10 17:44:47
回答 1查看 2.6K关注 0票数 1

使用kafka-python向主题发送大量消息。成功地将部分消息发送到主题,但并非所有消息都是在程序以下列错误消息终止之前发送的:

代码语言:javascript
复制
KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290>
Batch is already closed -- ignoring batch.done()
Error processing errback
Traceback (most recent call last):
  File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs
    f(value)
  File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce
    self._complete_batch(batch, error, -1, None)
  File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch
    self._accumulator.deallocate(batch)
  File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate
    self._incomplete.remove(batch)
  File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove
    return self._incomplete.remove(batch)

在我的主题中,每次运行都会收到不同数量的消息。问题似乎是,卡夫卡producer.send的电话没有完成发送之前,该程序达到其结束。

根据kafka文档,producer.send是一种异步方法,这可能是根本原因--并非所有异步线程都在进程终止前完成发送:

send()方法是异步的。调用时,它将记录添加到挂起的记录的缓冲区中,发送并立即返回。这允许生产者对单个记录进行批次,以提高效率。

有许多简单的解决方案(例如将batch.size设置为低数量)可能会导致性能瓶颈。

如何在不影响性能的情况下解决这个问题?

EN

回答 1

Stack Overflow用户

发布于 2017-11-15 16:03:48

出口前打电话给producer.flush()

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46672837

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档