我已经创建了多线程用户应用程序来处理不同的分区。查看各种博客,我了解了“max.poll.records”属性,以便控制来自给定主题、分区的一组记录(这样它就可以快速地从记录循环中出来,从而调用cons.poll()来保持活力)
问题是我的处理逻辑需要时间来处理每条记录。在启动Cons-2时,它们都开始在相同的分区上工作,而Cons-1仍然没有进行重新平衡(即cons.poll()尚未发生)。
增加消费者,使他们能够重新平衡他们自己,cons.poll()将不会发生,除非所有的记录被处理。
我可能不会选择“session.timeout.ms”,因为启动新的使用者也可能开始在Cons-1的同一分区上工作。
我尝试使用以下方法设置属性:
props.put("max.poll.records",1);
props.put("max.poll.records","1");但两者都没有改变。民意测验的记录。
我正在使用Apache 9和下面的API。
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.9.0.1_1</version>
</dependency>发布于 2016-05-25 11:44:06
max.poll.records属性在Kafka-0.10.0中发布。卡夫卡0.9.0.1版中没有这一版本。参见发布注释中的KAFKA-3007任务。
如果您处理记录花费了很长时间,下面的链接可能会有所帮助。
https://stackoverflow.com/questions/37413273
复制相似问题