首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >再平衡后的僵尸卡夫卡消费者该怎么办?

再平衡后的僵尸卡夫卡消费者该怎么办?
EN

Stack Overflow用户
提问于 2022-07-07 02:43:22
回答 1查看 177关注 0票数 1

我做了一个小实验,在使用批次消费者时显示了这一点。旧消息在重新平衡时可能会覆盖新消息。

Consumer1和30s max-poll-interval

代码语言:javascript
复制
    @KafkaListener(id = "t1", topics = "test", batch = "true")
    public void listen(List<String> records) throws InterruptedException {
        // mimic gc
        LOGGER.info("received size {}", records.size());
        Thread.sleep(80000L);
        // processed 1 record and died
        LOGGER.info("processed {}", records.get(0));
        System.exit(0);
    }

Consumer2,通常处理:

代码语言:javascript
复制
    @KafkaListener(id = "t1", topics = "test", batch = "true")
    public void listen(List<String> records) throws InterruptedException {
        for (String record : records) {
            LOGGER.info("processed {}", record);
        }
    }

在两个使用者启动前,将2条记录发送到具有值1和2的respectively.

  • Then start consumer1

  • 的主题中,当received size 2被记录时,start consumer2

  • consumer1离开组,因为睡眠时间超过max-poll-interval

  • consumer2接管了分区。和处理记录1和2

  • consumer1从睡眠中醒来和进程记录1死亡。

  • 最后处理的记录是1.

以下是两个使用者的日志: consumer1:

代码语言:javascript
复制
2022-07-07 10:28:13.025|[]|t1-0-C-1|INFO |TestRecordHandler.java:55|received size 2
2022-07-07 10:28:28.012|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:31.076|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:34.149|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:37.224|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:40.293|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: group is already rebalancing
2022-07-07 10:28:43.031|[]|kafka-coordinator-heartbeat-thread | t1|WARN |AbstractCoordinator.java:1396|[Consumer clientId=consumer-t1-7, groupId=t1] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2022-07-07 10:28:43.031|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:1048|[Consumer clientId=consumer-t1-7, groupId=t1] Member consumer-t1-7-a26e8418-3ff8-46b3-8152-17dbe88b620a sending LeaveGroup request to coordinator 100.85.230.15:9095 (id: 2147483645 rack: null) due to consumer poll timeout has expired.
2022-07-07 10:28:43.033|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:966|[Consumer clientId=consumer-t1-7, groupId=t1] Resetting generation due to: consumer pro-actively leaving the group
2022-07-07 10:28:43.033|[]|kafka-coordinator-heartbeat-thread | t1|INFO |AbstractCoordinator.java:988|[Consumer clientId=consumer-t1-7, groupId=t1] Request joining group due to: consumer pro-actively leaving the group
2022-07-07 10:29:33.028|[]|t1-0-C-1|INFO |TestRecordHandler.java:57|processed 1

consumer2:

代码语言:javascript
复制
2022-07-07 10:29:25.296|[]|t1-0-C-1|INFO |LogAccessor.java:292|t1: partitions assigned: [test-1, test-0, test-2]
2022-07-07 10:29:25.830|[]|t1-0-C-1|INFO |TestRecordHandler.java:87|processed 1
2022-07-07 10:29:25.830|[]|t1-0-C-1|INFO |TestRecordHandler.java:87|processed 2
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-07 08:00:09

Thread.sleep(80000) (我猜它是一个长期运行过程的隐喻)实际上正在打破契约,在任何时候,每个主题分区应该只有一个使用者(活着)。

就Kafka而言,这一点在您的测试中是正确的:因为消费者1没有超时,因此它被认为是死的,而唯一(有效的)消费者现在是消费者2。如果消费者1当时试图与Kafka集群交互,那么大多数操作都会失败,因为它不再注册到这些分区。

当消费者1在80年代之后真的还活着的时候,它的行为就像僵尸一样:卡夫卡集群认为它已经死了,尽管它仍然在处理它之前读过并保存在记忆中的数据。你不希望这种事发生。

相反,您希望它做的是丢弃它的数据并死掉。我认为,在Spring中实现ConsumerAwareRebalanceListener并对onPartitionsRevoked​https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.html做出反应是一种方法

另一种方法是确保max-poll-interval始终足够大以满足长处理的持续时间,方法是对冗长进程的持续时间进行装箱,并将max-poll-interval设置为大于该值的值。

僵尸是分布式系统中的一个常见问题:在网络的另一端,无法区分一个应用程序需要很长时间才能响应,而另一个应用程序由于崩溃/网络问题/K8重新启动它/.因此,超时几乎是我们拥有的主要工具,我们应该确保僵尸自杀,以使系统收敛到健康状态。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72891741

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档