我们在重载生产中使用Spring。我们使用了@KafkaListener注释,并创建了这些侦听器,作为Services的一部分。
这些使用者经常向协调器发送LeaveGroup请求,然后使用者无限期地挂起/停留,没有任何日志或错误。在这种情况下,我们剩下的唯一选择是重新部署该特定实例。
这是我们看到的一系列日志:
Attempt to heartbeat failed since group is rebalancing
Attempt to heartbeat failed since group is rebalancing
This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
Member consumer-1-0d0b333d-9e5b-4038-ac3f-e5d59c4e9d19 sending LeaveGroup request to coordinator 172.25.128.233:9092 (id: 2147483560 rack: null)更多信息:我们正在使用下面的Kafka Con图:
<代码>H 110会话超时: 25分钟超时: 20分钟<代码>H 213<代码>F 214>
基本上,我们想知道的是,一旦消费者离开这个小组,它为什么不再次发送连接请求?
发布于 2021-07-19 22:45:27
您应该理解重试挂起使用者线程(如果使用BackOffPolicy )。在重试期间没有对Consumer.poll()的调用。卡夫卡有两个属性来决定消费者的健康。session.timeout.ms用于确定使用者是否处于活动状态。由于kafka-客户端版本为0.10.1.0,心跳在后台线程上发送,因此慢用户不再影响这一点。max.poll.interval.ms (默认值:5分钟)用于确定消费者是否出现挂起(处理上次投票记录花费的时间太长)。如果轮询()调用之间的时间超过这一时间,代理将撤销分配的分区并执行重新平衡。对于冗长的重试序列,后退,这很容易发生。
从2.1.3版本开始,您可以通过与SeekToCurrentErrorHandler一起使用有状态重试来避免此问题。在这种情况下,每次传递尝试都会将异常抛回容器,错误处理程序重新查找未处理的偏移量,下一次轮询()将重新传递相同的消息。这避免了超出max.poll.interval.ms属性的问题(只要尝试之间的单个延迟不超过它)。因此,在使用ExponentialBackOffPolicy时,必须确保maxInterval小于max.poll.interval.ms属性。要启用有状态重试,可以使用接受有状态布尔参数(将其设置为true)的RetryingMessageListenerAdapter构造函数。配置侦听器容器工厂(用于@KafkaListener)时,将工厂的statefulRetry属性设置为true。
https://docs.spring.io/spring-kafka/reference/html/#stateful-retry
https://stackoverflow.com/questions/68382377
复制相似问题