首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在KafkaJS等待领导人选举

在KafkaJS等待领导人选举
EN

Stack Overflow用户
提问于 2020-08-24 17:58:22
回答 1查看 8.4K关注 0票数 7

形势

我正在使用卡夫卡伊编写一些动态生成的kafka主题。

我发现在注册我的制作人之后,立即给这些主题写信会经常导致一个错误:There is no leader for this topic-partition as we are in the middle of a leadership election

完全错误是:

代码语言:javascript
复制
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

“守则”

造成问题的代码如下:

代码语言:javascript
复制
import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  await producer.connect()
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

问题

两个问题:

  1. 在连接到生产者(或发送)时,我应该做什么特别的事情来确保逻辑块,直到生产者真正准备好将数据发送到一个kafka主题?
  2. 在向生产者发送数据以确保消息不被删除时,我应该做什么特别的事情吗?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-24 18:23:17

解决方案

Kafkajs通过具有可选的createTopics标志的管理客户端提供了一个waitForLeaders方法:

代码语言:javascript
复制
admin.createTopics({
  waitForLeaders: true,
  topics: [
    { topic: 'myRandomTopicString123' },
  ],
}

使用它可以解决问题。

代码语言:javascript
复制
import kafka from 'myConfiguredKafkaJs'

const run = async () => {
  const producer = kafka.producer()
  const admin = kafka.admin()
  await admin.connect()
  await producer.connect()
  await admin.createTopics({
    waitForLeaders: true,
    topics: [
      { topic: 'myRandomTopicString123' },
    ],
  })
  producer.send({
    topic: 'myRandomTopicString',
    messages: [{
      value: 'yolo',
    }],
  })
}

run()

不幸的是,如果主题已经存在,这将导致一个不同的错误,但这是一个单独的问题,我怀疑错误比破坏更具有信息性。

代码语言:javascript
复制
{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}

编辑:上述设置确实要求您的Kafka实例是正确配置的。这是有可能的领导选举永远不会解决,在这种情况下,KafkaJS仍然会抱怨领导选举!

在我的经验中,这是由于卡夫卡经纪人在没有被动物园管理员注销注册的情况下被阻止的,这意味着动物园管理员正在等待某种不再存在的东西的回应。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63566301

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档