我有一个关于管理多个CG的问题,创建了三个消费者组,每个CG都有自己的kafka服务、组Id和主题。
现在我收到了预期的消息,但是,我想知道是否有可能创建下一个场景:
创建三个消费者组,但只接收来自一个用户的消息,暂时暂停/暂停其他用户组,如果他的kafka服务将下降,则使用下一个消费者组的消息,与第三个用户组相同。
下面是我的代码示例:
function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){ //3
const options = {
groupId: config.kafka_service[i]['groupId'],
host: config.kafka_service[i]['zookeeperHost'],
kafkaHost: config.kafka_service[i]['kafkaHost'],
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
}
//assign all services CG names and create [i] consumer groups!
let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];
customConsumerGroupName = new ConsumerGroup(options, topics);
customConsumerGroupName.on('connect', (resp) => {
console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
});
if(i > 0){
//pause consumers exept FIRST
customConsumerGroupName.pause();
}
customConsumerGroupName.on('message', (message) => {
console.log(message);
});
customConsumerGroupName.on('error', (error) => {
console.log('consumer group error: ', error);
//HERE I NEED TO CALL SECOND CONSUMER TO STEP UP
//MAYBE consumerGroup.resume(); ???
});
}
}希望这听起来可以理解,谢谢
发布于 2018-07-16 15:22:29
因此,这似乎是因为Node包的名称“ConsumerGroup”而引起的混乱。用卡夫卡的术语来说,消费者群体完全由每个消费者使用的groupId控制。具有相同groupId的使用者将不会获得重复的消息,每个主题消息只由单个使用者读取。如果消费者下降,kafka会检测到这一点,并将其分区给单独的消费者。
Node 'ConsumerGroup‘实际上只是另一个卡夫卡消费者(卡夫卡管理的新消费者,而不是卡夫卡>0.9的动物园管理员)。
因此,利用Node ConsumerGroup来利用卡夫卡消费者群体的方法如下:
function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){ //3
const options = {
groupId: 'SOME_GROUP_NAME',
host: config.kafka_service[i]['zookeeperHost'],
kafkaHost: config.kafka_service[i]['kafkaHost'],
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
}
//assign all services CG names and create [i] consumer groups!
let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];
customConsumerGroupName = new ConsumerGroup(options, topics);
customConsumerGroupName.on('connect', (resp) => {
console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
});
customConsumerGroupName.on('message', (message) => {
console.log(message);
});
customConsumerGroupName.on('error', (error) => {
console.log('consumer group error: ', error);
//Error handling logic here, restart the consumer that failed perhaps?
//Depends on how you want to managed failed consumers.
});
}
}节点ConsumerGroup的每个实例都将是组‘same’的成员,使用该groupId创建的任何其他使用者也将充当同一个kafka使用者组的成员,而不管服务器等。
发布于 2018-07-13 09:32:51
消费者群体解决了两种核心方案:
1.缩放可以增加组中的使用者数量,以处理组正在消费的主题中正在生成的消息的不断增加的速率(扩展)
2.故障转移通过让一组消费者阅读相同的主题,他们将自动处理一个或多个消费者下降的情况。
所以,与其有“袖手旁观”的消费者群体,你必须处理哪些是活跃的你自己,你只是依赖于卡夫卡的内置故障转移。消费者可以在几个不同的容器中运行(甚至在不同的数据中心),而且Kafka将自动确保消息被传递给每个消费者,不管他们在哪里或者他们中有多少人在任何特定的时间运行。
https://stackoverflow.com/questions/51307720
复制相似问题