首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KAFKA节点:管理多个消费群体

KAFKA节点:管理多个消费群体
EN

Stack Overflow用户
提问于 2018-07-12 14:04:16
回答 2查看 3.7K关注 0票数 2

我有一个关于管理多个CG的问题,创建了三个消费者组,每个CG都有自己的kafka服务、组Id和主题。

现在我收到了预期的消息,但是,我想知道是否有可能创建下一个场景:

创建三个消费者组,但只接收来自一个用户的消息,暂时暂停/暂停其他用户组,如果他的kafka服务将下降,则使用下一个消费者组的消息,与第三个用户组相同。

下面是我的代码示例:

代码语言:javascript
复制
function createConsumerGroup(topics){

    const ConsumerGroup = kafka.ConsumerGroup;

    //CREATE CONSUMER GROUPS FOR EVERY SERVICE
    for(let i = 0; i < config.kafka_service.length ;i++){  //3

        const options = {
            groupId: config.kafka_service[i]['groupId'],
            host: config.kafka_service[i]['zookeeperHost'],
            kafkaHost: config.kafka_service[i]['kafkaHost'],
            sessionTimeout: 15000,
            protocol: ['roundrobin'],
            fromOffset: 'latest'
        }

        //assign all services CG names and create [i] consumer groups!
        let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];

        customConsumerGroupName = new ConsumerGroup(options, topics);

        customConsumerGroupName.on('connect', (resp) => {
            console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
        });

        if(i > 0){
            //pause consumers exept FIRST
            customConsumerGroupName.pause();
        }


        customConsumerGroupName.on('message', (message) => {
           console.log(message);
        });

        customConsumerGroupName.on('error', (error) => {
            console.log('consumer group error: ', error);

           //HERE I NEED TO CALL SECOND CONSUMER TO STEP UP
           //MAYBE consumerGroup.resume(); ???
        });

    }
}

希望这听起来可以理解,谢谢

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-07-16 15:22:29

因此,这似乎是因为Node包的名称“ConsumerGroup”而引起的混乱。用卡夫卡的术语来说,消费者群体完全由每个消费者使用的groupId控制。具有相同groupId的使用者将不会获得重复的消息,每个主题消息只由单个使用者读取。如果消费者下降,kafka会检测到这一点,并将其分区给单独的消费者。

Node 'ConsumerGroup‘实际上只是另一个卡夫卡消费者(卡夫卡管理的新消费者,而不是卡夫卡>0.9的动物园管理员)。

因此,利用Node ConsumerGroup来利用卡夫卡消费者群体的方法如下:

代码语言:javascript
复制
function createConsumerGroup(topics){

const ConsumerGroup = kafka.ConsumerGroup;

//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){  //3

    const options = {
        groupId: 'SOME_GROUP_NAME',
        host: config.kafka_service[i]['zookeeperHost'],
        kafkaHost: config.kafka_service[i]['kafkaHost'],
        sessionTimeout: 15000,
        protocol: ['roundrobin'],
        fromOffset: 'latest'
    }

    //assign all services CG names and create [i] consumer groups!
    let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];

    customConsumerGroupName = new ConsumerGroup(options, topics);

    customConsumerGroupName.on('connect', (resp) => {
        console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
    });

    customConsumerGroupName.on('message', (message) => {
       console.log(message);
    });

    customConsumerGroupName.on('error', (error) => {
        console.log('consumer group error: ', error);

       //Error handling logic here, restart the consumer that failed perhaps? 
       //Depends on how you want to managed failed consumers.
    });
  }
}

节点ConsumerGroup的每个实例都将是组‘same’的成员,使用该groupId创建的任何其他使用者也将充当同一个kafka使用者组的成员,而不管服务器等。

票数 4
EN

Stack Overflow用户

发布于 2018-07-13 09:32:51

消费者群体解决了两种核心方案:

1.缩放可以增加组中的使用者数量,以处理组正在消费的主题中正在生成的消息的不断增加的速率(扩展)

2.故障转移通过让一组消费者阅读相同的主题,他们将自动处理一个或多个消费者下降的情况。

所以,与其有“袖手旁观”的消费者群体,你必须处理哪些是活跃的你自己,你只是依赖于卡夫卡的内置故障转移。消费者可以在几个不同的容器中运行(甚至在不同的数据中心),而且Kafka将自动确保消息被传递给每个消费者,不管他们在哪里或者他们中有多少人在任何特定的时间运行。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51307720

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档