我做了一个小实验,在使用批次消费者时显示了这一点。旧消息在重新平衡时可能会覆盖新消息。
Consumer1和30s max-poll-interval
@KafkaListener(id = "t1", topics = "test", batch = "true")
public void listen(List<String> records) throws InterruptedException {
// mimic gc
LOGGER.info("received size {}", records.size());
Thread.sleep(80000L);
// processed 1 record and died
LOGGER.info("processed {}", records.get(0));
System.exit(0);
}Consumer2,通常处理:
@KafkaListener(id = "t1", topics = "test", batch = "true")
public void listen(List<String> records) throws InterruptedException {
for (String record : records) {
LOGGER.info("processed {}", record);
}
}在两个使用者启动前,将2条记录发送到具有值1和2的respectively.
received size 2被记录时,start consumer2
max-poll-interval
。
以下是两个使用者的日志: consumer1:
2022-07-07 10:28:13.025|[]|t1-0-C-1|INFO |TestRecordHandler.java:55|received size 2
2022-07-07 10:28:28.012|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:31.076|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:34.149|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:37.224|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:40.293|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:43.031|[]|kafka-coordinator-heartbeat-thread | t1|WARN |AbstractCoordinator.java:1396|[Consumer clientId=consumer-t1-7, groupId=t1] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2022-07-07 10:28:43.031|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:1048|[Consumer clientId=consumer-t1-7, groupId=t1] Member consumer-t1-7-a26e8418-3ff8-46b3-8152-17dbe88b620a sending LeaveGroup request to coordinator 100.85.230.15:9095 (id: 2147483645 rack: null) due to consumer poll timeout has expired.
2022-07-07 10:28:43.033|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:966|[Consumer clientId=consumer-t1-7, groupId=t1] Resetting generation due to: consumer pro-actively leaving the group
2022-07-07 10:28:43.033|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: consumer pro-actively leaving the group
2022-07-07 10:29:33.028|[]|t1-0-C-1|INFO |TestRecordHandler.java:57|processed 1consumer2:
2022-07-07 10:29:25.296|[]|t1-0-C-1|INFO |LogAccessor.java:292|t1: partitions assigned: [test-1, test-0, test-2]
2022-07-07 10:29:25.830|[]|t1-0-C-1|INFO |TestRecordHandler.java:87|processed 1
2022-07-07 10:29:25.830|[]|t1-0-C-1|INFO |TestRecordHandler.java:87|processed 2发布于 2022-07-07 08:00:09
Thread.sleep(80000) (我猜它是一个长期运行过程的隐喻)实际上正在打破契约,在任何时候,每个主题分区应该只有一个使用者(活着)。
就Kafka而言,这一点在您的测试中是正确的:因为消费者1没有超时,因此它被认为是死的,而唯一(有效的)消费者现在是消费者2。如果消费者1当时试图与Kafka集群交互,那么大多数操作都会失败,因为它不再注册到这些分区。
当消费者1在80年代之后真的还活着的时候,它的行为就像僵尸一样:卡夫卡集群认为它已经死了,尽管它仍然在处理它之前读过并保存在记忆中的数据。你不希望这种事发生。
相反,您希望它做的是丢弃它的数据并死掉。我认为,在Spring中实现ConsumerAwareRebalanceListener并对onPartitionsRevoked:https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.html做出反应是一种方法
另一种方法是确保max-poll-interval始终足够大以满足长处理的持续时间,方法是对冗长进程的持续时间进行装箱,并将max-poll-interval设置为大于该值的值。
僵尸是分布式系统中的一个常见问题:在网络的另一端,无法区分一个应用程序需要很长时间才能响应,而另一个应用程序由于崩溃/网络问题/K8重新启动它/.因此,超时几乎是我们拥有的主要工具,我们应该确保僵尸自杀,以使系统收敛到健康状态。
https://stackoverflow.com/questions/72891741
复制相似问题