首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >kafka 静态消费组成员

kafka 静态消费组成员

原创
作者头像
markzhang12
发布2021-02-08 11:28:50
发布2021-02-08 11:28:50
1.7K0
举报

kafka 静态消费组成员

kafka的消费者组机制一直很受诟病,原因是他的设计看起来是比较美好的,但是在实际使用过程中,由于各种业务本身的消费逻辑漫长或者用户的使用姿势问题,导致自身的消费者组经常陷入无限的重平衡中,而由于消费者组的STW机制也会导致同组内的其他消费者出现消费停止的情况。这种现象在越大的工业集群中越容易出现,所以为了改进这种现象,kafka从2.3版本开始提供了静态消费者组的机制。(云上ckafka可以购买专业版2.4 也可以支持本特性)

为什么需要

kafka的消费者组机制,可以支持某个程序故障退出了,剩下的消费者可以快速拥有退出消费者的分区,并继续消费。但是这里存在一些问题使得消费者组的实际表现并不怎么好,同时现代的程序架构下,并不需要kafka本身的消费组机制来达成故障恢复的能力。

  1. 消费者能力已经到顶了,如果再拥有退出消费者的分区,由于消费能力不够,导致不断触发重平衡,于是整个消费者组都没法继续消费。
  2. 消费者虽然退出了,但是由于现代程序架构下大家普遍使用了supervisor机制或者是运行在k8s上的pod,消费者可能很快就会回来,但是这个时候重平衡已经触发了,由于消费者回来,又会触发一次重平衡,这种情况下每次退出恢复都会导致两次重平衡的出现,这种不必要的重平衡在大型消费集群中出现是很难接受的。
  3. 快速的滚动升级,正常的程序迭代,由于每次发布都会导致服务的重启,触发整个消费者组的重平衡,这种情况在现代架构下看起来也是不必要的。
  4. kafka的消费者是不能超过分区数的,虽然在表面看来超过了分区数只是会有部分消费者无法拥有分区,但是从实际的生产环境来看,由于重平衡时多个消费者可能出现间歇性拥有某几个分区,然后在消费能力不足,且消费逻辑比较漫长的情况下,又出现反复重平衡。

基本原理

静态消费者组会尽量在 组成员发生一些变动的时候阻止消费者组状态从 STABLE 变换为 PREPARE_REBALANCE。

为了达成这样的目的,kafka在2.3版本修改了Group的多个API且更改了启动了静态消费者的客户端退出逻辑

  1. 加入group.instance.id 参数,用于识别静态消费者成员,一旦设定了这个参数消费者就会被认为是静态消费者
  2. 静态消费者退出的时候不再往服务端发生LeaveGroup请求,直到session超时,才会被剔除消费者组
  3. 加大了服务端的最大session超时,在服务端支持下,客户端的最大session超时可以设定为30分钟

静态消费者情况下重平衡逻辑及注意事项

  1. 消费者组成员增加,会触发重平衡
  2. session超时会触发重平衡(这里session超时配置建议是基于能够容忍不可用的时间来配置,尽量延长为重启的程序和消费慢的程序留出时间)
  3. max.poll.interval.ms 始终大于 session.timeout.ms 如果session timeout为5min,那么poll.interval.ms也要大于5min
  4. 客户端程序必须要自己确保group.instance.id的唯一性,重复的group.instance.id加入同一个消费者组会报错
  5. 目前已知java官方客户端(2.3以上)和Librdkafka(1.4.0以上) 支持本特性,sarama暂时不支持

附录代码

代码语言:txt
复制
#include <iostream>
#include <librdkafka/rdkafkacpp.h>

int main()
{
    std::string err;
    std::vector<std::string> topics;
    auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "vip:port", err);
    conf->set("enable.partition.eof", "false", err);
    conf->set("group.id", "markstatic", err);
    conf->set("group.instance.id", "consumer-450", err); // instance.id必须唯一
    conf->set("session.timeout.ms", "600000", err); // sesion timeout 为能够忍受的分区不可用最长时间
    conf->set("max.poll.interval.ms", "600500", err); // poll.interval.ms需要大于 sesion timeout
    
    auto consumer = RdKafka::KafkaConsumer::create(conf, err);
    if (!consumer)
    {
        std::cerr << "Failed to create consumer: " << err << std::endl;
        exit(1);
    }
    topics.push_back("mytest1");
    auto suberr = consumer->subscribe(topics);
    if (suberr)
    {
        std::cerr << "Failed to subscribe to " << topics.size() << " topics: "
                  << RdKafka::err2str(suberr) << std::endl;
        exit(1);
    }
    while (true)
    {
        auto msg = consumer->consume(1000);
        std::cout << " Message in " << msg->topic_name() << " [" << msg->partition() << "] at offset " << msg->offset() << std::endl;
        delete msg; 
    }
    
    
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kafka 静态消费组成员
    • 为什么需要
    • 基本原理
    • 静态消费者情况下重平衡逻辑及注意事项
    • 附录代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档