形势
我正在使用卡夫卡伊编写一些动态生成的kafka主题。
我发现在注册我的制作人之后,立即给这些主题写信会经常导致一个错误:There is no leader for this topic-partition as we are in the middle of a leadership election。
完全错误是:
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}“守则”
造成问题的代码如下:
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
await producer.connect()
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()问题
两个问题:
发布于 2020-08-24 18:23:17
解决方案
Kafkajs通过具有可选的createTopics标志的管理客户端提供了一个waitForLeaders方法:
admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
}使用它可以解决问题。
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
const admin = kafka.admin()
await admin.connect()
await producer.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
})
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()不幸的是,如果主题已经存在,这将导致一个不同的错误,但这是一个单独的问题,我怀疑错误比破坏更具有信息性。
{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}编辑:上述设置确实要求您的Kafka实例是正确配置的。这是有可能的领导选举永远不会解决,在这种情况下,KafkaJS仍然会抱怨领导选举!
在我的经验中,这是由于卡夫卡经纪人在没有被动物园管理员注销注册的情况下被阻止的,这意味着动物园管理员正在等待某种不再存在的东西的回应。
https://stackoverflow.com/questions/63566301
复制相似问题