首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在ConsumerGroup中设置动物园管理员IP地址而不是卡夫卡-主机IP地址?

如何在ConsumerGroup中设置动物园管理员IP地址而不是卡夫卡-主机IP地址?
EN

Stack Overflow用户
提问于 2019-05-31 08:12:59
回答 2查看 844关注 0票数 0

我必须在ConsumerGroup中设置动物园管理员IP地址,而不是卡夫卡-主机IP地址.因为我已经将复制因子设置为3和3,所以创建了代理。因此,如果一个主机失败了,那么另一个主机就可以接管。

当我试图在ConsumerGroup中放置动物园管理员IP地址而不是Kafka-主机IP地址时,它没有收到任何来自Producer的消息。

代码语言:javascript
复制
var kafka = require('kafka-node')
var ConsumerGroup = kafka.ConsumerGroup

function createConsumerGroup () {
  var options = {
    kafkaHost: '127.0.0.1:9092',
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

  var consumerGroup = new ConsumerGroup(Object.assign({ id: 'demo-' + process.pid }, options), 'example')

  consumerGroup.on('message', function (message) {
    message.value = JSON.parse(message.value)
    console.log('Message Received')
  })
}

我想要的是,如果我在ConsumerGroup中传递动物园管理员IP地址,而不是Kafka-主机IP地址,那么它就应该接收生产者API在“示例”主题上发送的消息。如果一个代理失败了,那么它应该接收来自另一个代理的消息。当复制因子设置为3和3个时,创建了代理。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-06-01 11:30:47

好的,问题是consumerGroup选项对象。

我们必须在options对象中的“主机”密钥中传递动物园管理员IP地址,而不是"kafkaHost“密钥。这解决了问题,并接收生产者API发送的所有数据。如果一个副本集失败,甚至会自动切换到另一个副本集。

代码语言:javascript
复制
var options = {
    kafkaHost: '127.0.0.1:9092',
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

下面的代码块修复了它。

代码语言:javascript
复制
var options = {
    host: '127.0.0.1:2181', // change in key & value
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

谢谢。

票数 0
EN

Stack Overflow用户

发布于 2019-05-31 12:59:54

卡夫卡0.9中引入的新消费者API不需要与动物园管理员连接。小组平衡现在由卡夫卡自己处理。因此,你必须提供卡夫卡主机,而不是动物园管理员主机(S)。

汇合式博客文章应该能提供更多的启示:

在Apache发布0.8.2版本时,它发布了重新设计的生产者客户端,我们也承诺重新设计消费者客户端。我们信守承诺: 0.9版引入了对新设计的消费者客户端的beta版支持。在高层次上,新消费者的主要区别在于,它消除了基于“高级”动物园管理员的消费者与“低级别”SimpleConsumer API之间的区别,而是提供了统一的消费API。 这个新的使用者是使用强大的新服务器端设施来实现的,这使得组管理成为Kafka协议的头等部分。这有几个优点。首先,它允许一个更可伸缩的组功能,这使得消费者客户端变得更简单和更薄,并且允许更大的组具有更快的再平衡。所有客户都可以使用这一设施;在C客户端librdkafka中使用它的工作已经接近完成。同样的工具对于管理、分布式、生产和使用Kafka中的数据非常有用;它是Kafka和几个即将到来的项目的基础。最后,这完成了在过去几年中完成的一系列项目,目的是将Kafka客户端与完全解耦,从而完全消除了消费者客户端对Zookeeper的依赖。动物园管理员仍然是卡夫卡使用,但它是一个实现细节的经纪人-客户谁使用这个新设施没有必要连接到动物园管理员在任何地方。这有许多操作上的好处,因为客户端现在总是通过代理提供的安全和配额机制工作。这大大简化了使用者,并为消费者API的一流非Java实现的出现打开了大门。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56391212

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档