我使用@KafkaListener作为道具
max.poll.records到50。(每条记录需要40至60秒的时间处理)
启用-自动提交=false
进入手动即时模式
下面是逻辑
@KafkaListener(groupId=“ABC”, topic=“Data1” containerFactory=“myCustomContainerFactory”)
public void listen(ConsumerRecord<String, Object> record, Acknowledge ack) {
try{
process(record);
ack.acknowledge();
}
Catch(e){
reprocess() // pause container and seek
}
}其他道具,如max.poll.interval.ms、session.timeout.ms或心跳都是默认值。
我无法理解这里到底出了什么问题,
假设将500 msg发布到2个分区。
发布时,它的所有500 msg都会被投票。
一个小时后,日志文件显示相同的消息被多次读取。
任何帮助都是非常感谢的。
发布于 2021-05-28 16:17:56
默认max.poll.interval.ms为300,000毫秒(5分钟)。
你要么需要减少max.poll.records,要么增加间隔--否则卡夫卡就会因为一个反应迟钝的消费者而强制进行再平衡。
在处理时间如此长的情况下,我建议使用max.poll.records=1;您显然不需要更高的吞吐量。
https://stackoverflow.com/questions/67741756
复制相似问题