我正在使用spring和Kafka来使用Kafka主题中的数据。我已将并发数配置为10。因此,不同的线程轮询代理以获取消息并处理这些消息。即使在经过一段时间(成功处理)之后,我们仍会将相同的消息返回给消费者的不同线程。我们能够在已配置的max.poll.interval.ms=1500000中处理收到的消息。
请查看下面配置的Kafka消费者属性。我已经通过Kafka配置了自动提交。
group.id=ips-request-group //group id
enable.auto.commit=true // auto commit
auto.commit.interval.ms=60000 // auto commit inverval
session.timeout.ms=100000 // session time out
request.timeout.ms=610000 // request time out
fetch.max.wait.ms=500 //polling time out
heartbeat.interval.ms=20000 // heart beat interval
auto.offset.reset=latest //consuming latest messages.
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer // key
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer //value
max.poll.records=10 // max polling records
max.poll.interval.ms=1500000 // max polling interval ms /*可以帮我解决重复接收消息给Kafka消费者的问题吗?
发布于 2020-04-25 01:49:35
要在处理每条记录后提交偏移量,请设置auto.commit.enabled=false并将容器属性ackMode设置为AckMode.RECORD。
https://stackoverflow.com/questions/61411705
复制相似问题