首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡不断重新平衡消费者

卡夫卡不断重新平衡消费者
EN

Stack Overflow用户
提问于 2019-06-18 18:50:21
回答 2查看 5.3K关注 0票数 9

我们在一个组中有10个消费者在听一个主题。经常发生的情况是看到消费者经常被重新平衡(这会在一段时间内完全停止消费者过程)。

代码语言:javascript
复制
# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  --group ParserKafkaPipeline | grep -e ParserBody | sort
ParserBodyToParse 0          99              99              0               consumer-1-f29b7eb7-b871-477c-af52-446fbf4b0496  /10.12.18.58    consumer-1
ParserBodyToParse 1          97              97              0               consumer-10-6639ee02-8e68-40e6-aca1-eabd89bf828e /10.12.18.58    consumer-10
ParserBodyToParse 2          97              97              0               consumer-11-c712db8b-0396-4388-9e3a-e8e342355547 /10.12.18.58    consumer-11
ParserBodyToParse 3          97              98              1               consumer-12-0cc6fe12-d640-4344-91c0-f15e63c20cca /10.12.18.58    consumer-12
ParserBodyToParse 4          97              98              1               consumer-13-b904a958-141d-412e-83ea-950cd51e25e0 /10.12.18.58    consumer-13
ParserBodyToParse 5          97              98              1               consumer-14-7c70ba88-8b8c-4fad-b15b-cf7692a4b9ce /10.12.18.58    consumer-14
ParserBodyToParse 6          98              98              0               consumer-15-f0983c3d-8704-4127-808d-ec8b6b847008 /10.12.18.58    consumer-15
ParserBodyToParse 7          97              97              0               consumer-18-de5d20dd-217c-4db2-9b39-e2fdbca386e9 /10.12.18.58    consumer-18
ParserBodyToParse 8          98              98              0               consumer-5-bdeaf30a-d2bf-4aec-86ea-9c35a7acfe21  /10.12.18.58    consumer-5
ParserBodyToParse 9          98              98              0               consumer-9-4de1bf17-9474-4bd4-ae61-4ab254f52863  /10.12.18.58    consumer-9

# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  --group ParserKafkaPipeline | grep -e ParserBody | sort
Warning: Consumer group 'ParserKafkaPipeline' is rebalancing.
ParserBodyToParse 0          99              99              0               -               -               -
ParserBodyToParse 1          99              99              0               -               -               -
ParserBodyToParse 2          99              99              0               -               -               -
ParserBodyToParse 3          99              100             1               -               -               -
ParserBodyToParse 4          99              100             1               -               -               -
ParserBodyToParse 5          99              100             1               -               -               -
ParserBodyToParse 6          100             100             0               -               -               -
ParserBodyToParse 7          99              99              0               -               -               -
ParserBodyToParse 8          100             100             0               -               -               -
ParserBodyToParse 9          100             100             0               -               -               -

注意上面第二个调用中的警告。

使用这些消息可能需要很长时间,但应该不会超过两分钟。我检查了consumer.poll的限制是5分钟,这应该不是问题。是否有一些日志来检查到底发生了什么?

更新:

我们使用Kafka 2.2.1和Java consumer。我们没有更改max.sessionmax.heartbeat的默认值。使用者基本上是在等待来自其他服务的IO,所以它没有使用任何CPU -这就是为什么我期望心跳应该正常工作的原因。

我们的消费者代码如下:

代码语言:javascript
复制
    inline fun <reified T : Any> consume(
            topic: KafkaTopic,
            groupId: String,
            batchSize: Int = 50,
            crossinline consume: (key: String?, value: T) -> (Unit)
    ) = thread {
        val consumerProperties = Properties()
        consumerProperties.putAll(properties)
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize)

        val consumer = KafkaConsumer<String?, ByteArray>(consumerProperties)

        consumer.subscribe(listOf(topic.toString()))

        while (true) try {
            val records = consumer.poll(Duration.ofMinutes(pollDurationMinutes))
            log.debug("Topic $topic consumed by group $groupId: ${records.count()} records.")
            records.forEach { record -> consumeRecord(record, topic, consume) }
        } catch (e: Exception) {
            log.fatal("Couldn't consume records: ${e.message}.", e)
            // sleep to prevent logging hell when connection failure
            Thread.sleep(1000)
        }
    }
EN

回答 2

Stack Overflow用户

发布于 2019-06-22 00:11:12

频繁的重新平衡通常是因为消费者处理批量的时间太长而导致的。这是因为消费者正在处理批处理了很长一段时间(并且心跳没有被发送),因此代理认为消费者丢失了,并开始重新平衡。

我建议通过减少max.partition.fetch.bytes的值来创建较小的批,或者通过增加heartbeat.interval.ms的值来延长/增加心跳间隔。

票数 4
EN

Stack Overflow用户

发布于 2020-03-27 21:09:54

我认为Giorgos答案的第一部分是正确的,直到“.processing the batch for long time”,但配置建议是针对不同的问题。

造成再平衡的原因有两个,一是投票间隔过长,二是心跳间隔过长。日志应该会告诉您是哪个导致了重新平衡,但通常是前者。

如果问题是心跳,那么建议的配置更改可能会有所帮助,和/或session.timeout.ms。心跳在单独的线程中运行,并允许组快速确定使用者应用程序是否已死亡。

如果两次轮询之间的问题太长,并且您无法加快处理速度,则需要增加调用轮询之间的允许间隔,或者减少每次轮询中处理的记录数量。相关属性为max.poll.interval (默认为5分钟)或max.poll.records (默认为500)

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56647442

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档