
kafka的消费者组机制一直很受诟病,原因是他的设计看起来是比较美好的,但是在实际使用过程中,由于各种业务本身的消费逻辑漫长或者用户的使用姿势问题,导致自身的消费者组经常陷入无限的重平衡中,而由于消费者组的STW机制也会导致同组内的其他消费者出现消费停止的情况。这种现象在越大的工业集群中越容易出现,所以为了改进这种现象,kafka从2.3版本开始提供了静态消费者组的机制。(云上ckafka可以购买专业版2.4 也可以支持本特性)
kafka的消费者组机制,可以支持某个程序故障退出了,剩下的消费者可以快速拥有退出消费者的分区,并继续消费。但是这里存在一些问题使得消费者组的实际表现并不怎么好,同时现代的程序架构下,并不需要kafka本身的消费组机制来达成故障恢复的能力。
静态消费者组会尽量在 组成员发生一些变动的时候阻止消费者组状态从 STABLE 变换为 PREPARE_REBALANCE。
为了达成这样的目的,kafka在2.3版本修改了Group的多个API且更改了启动了静态消费者的客户端退出逻辑
#include <iostream>
#include <librdkafka/rdkafkacpp.h>
int main()
{
std::string err;
std::vector<std::string> topics;
auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "vip:port", err);
conf->set("enable.partition.eof", "false", err);
conf->set("group.id", "markstatic", err);
conf->set("group.instance.id", "consumer-450", err); // instance.id必须唯一
conf->set("session.timeout.ms", "600000", err); // sesion timeout 为能够忍受的分区不可用最长时间
conf->set("max.poll.interval.ms", "600500", err); // poll.interval.ms需要大于 sesion timeout
auto consumer = RdKafka::KafkaConsumer::create(conf, err);
if (!consumer)
{
std::cerr << "Failed to create consumer: " << err << std::endl;
exit(1);
}
topics.push_back("mytest1");
auto suberr = consumer->subscribe(topics);
if (suberr)
{
std::cerr << "Failed to subscribe to " << topics.size() << " topics: "
<< RdKafka::err2str(suberr) << std::endl;
exit(1);
}
while (true)
{
auto msg = consumer->consume(1000);
std::cout << " Message in " << msg->topic_name() << " [" << msg->partition() << "] at offset " << msg->offset() << std::endl;
delete msg;
}
}原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。