我必须在ConsumerGroup中设置动物园管理员IP地址,而不是卡夫卡-主机IP地址.因为我已经将复制因子设置为3和3,所以创建了代理。因此,如果一个主机失败了,那么另一个主机就可以接管。
当我试图在ConsumerGroup中放置动物园管理员IP地址而不是Kafka-主机IP地址时,它没有收到任何来自Producer的消息。
var kafka = require('kafka-node')
var ConsumerGroup = kafka.ConsumerGroup
function createConsumerGroup () {
var options = {
kafkaHost: '127.0.0.1:9092',
batch: undefined,
ssl: true,
groupId: 'demoExample',
protocol: ['roundrobin'],
encoding: 'utf8',
fromOffset: 'latest',
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: 'earliest',
onRebalance: (isAlreadyMember, callback) => { callback(); }
}
var consumerGroup = new ConsumerGroup(Object.assign({ id: 'demo-' + process.pid }, options), 'example')
consumerGroup.on('message', function (message) {
message.value = JSON.parse(message.value)
console.log('Message Received')
})
}我想要的是,如果我在ConsumerGroup中传递动物园管理员IP地址,而不是Kafka-主机IP地址,那么它就应该接收生产者API在“示例”主题上发送的消息。如果一个代理失败了,那么它应该接收来自另一个代理的消息。当复制因子设置为3和3个时,创建了代理。
发布于 2019-06-01 11:30:47
好的,问题是consumerGroup选项对象。
我们必须在options对象中的“主机”密钥中传递动物园管理员IP地址,而不是"kafkaHost“密钥。这解决了问题,并接收生产者API发送的所有数据。如果一个副本集失败,甚至会自动切换到另一个副本集。
var options = {
kafkaHost: '127.0.0.1:9092',
batch: undefined,
ssl: true,
groupId: 'demoExample',
protocol: ['roundrobin'],
encoding: 'utf8',
fromOffset: 'latest',
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: 'earliest',
onRebalance: (isAlreadyMember, callback) => { callback(); }
}下面的代码块修复了它。
var options = {
host: '127.0.0.1:2181', // change in key & value
batch: undefined,
ssl: true,
groupId: 'demoExample',
protocol: ['roundrobin'],
encoding: 'utf8',
fromOffset: 'latest',
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: 'earliest',
onRebalance: (isAlreadyMember, callback) => { callback(); }
}谢谢。
发布于 2019-05-31 12:59:54
卡夫卡0.9中引入的新消费者API不需要与动物园管理员连接。小组平衡现在由卡夫卡自己处理。因此,你必须提供卡夫卡主机,而不是动物园管理员主机(S)。
汇合式博客文章应该能提供更多的启示:
在Apache发布0.8.2版本时,它发布了重新设计的生产者客户端,我们也承诺重新设计消费者客户端。我们信守承诺: 0.9版引入了对新设计的消费者客户端的beta版支持。在高层次上,新消费者的主要区别在于,它消除了基于“高级”动物园管理员的消费者与“低级别”SimpleConsumer API之间的区别,而是提供了统一的消费API。 这个新的使用者是使用强大的新服务器端设施来实现的,这使得组管理成为Kafka协议的头等部分。这有几个优点。首先,它允许一个更可伸缩的组功能,这使得消费者客户端变得更简单和更薄,并且允许更大的组具有更快的再平衡。所有客户都可以使用这一设施;在C客户端librdkafka中使用它的工作已经接近完成。同样的工具对于管理、分布式、生产和使用Kafka中的数据非常有用;它是Kafka和几个即将到来的项目的基础。最后,这完成了在过去几年中完成的一系列项目,目的是将Kafka客户端与完全解耦,从而完全消除了消费者客户端对Zookeeper的依赖。动物园管理员仍然是卡夫卡使用,但它是一个实现细节的经纪人-客户谁使用这个新设施没有必要连接到动物园管理员在任何地方。这有许多操作上的好处,因为客户端现在总是通过代理提供的安全和配额机制工作。这大大简化了使用者,并为消费者API的一流非Java实现的出现打开了大门。
https://stackoverflow.com/questions/56391212
复制相似问题