我试图使用kafkajs从Kafka获取消息,我想将所有消息捕获到" messages“(一个数组) config中--在这里保存信任的变量是我使用的函数:
const { Kafka, logLevel } = require('kafkajs')
async function consume_messages(config) {
const kafka = new Kafka({
logLevel: logLevel.INFO,
brokers: [config.broker],
ssl: true,
sasl: {
mechanism: [config.mechanism],
username: config.Username,
password: config.Password
},
})
const topic = config.client_id
const consumer = kafka.consumer({
groupId: 'my-group', fromBeginning: true
})
await consumer.connect();
await consumer.subscribe({
topics: [topic],
fromBeginning: true
})
let messages = []
await consumer.run({
eachMessage: async ({ message }) => {
messages.push(message)
console.log('RECEIVED MESSAGE', JSON.parse(message.value), message.offset);
}
})
})
await consumer.disconnect();
return messages ;
}当我运行它时,我没有得到任何“消息”,但是,当我从“等待consumer.run”({“-> "consumer.run({”->“consumer.run))中移除等待时,"messages”仍然是空的,但只有在整个脚本结束之后,它才能工作。
我如何强制它运行并等待所有消息被获取?
发布于 2022-09-13 06:41:04
唯一对我起作用的是在运行()结束后添加一个暂停。我必须说,我对这个解决方案不满意,因为我强迫我的一方等待固定的5秒,不管结果如何。
我会很高兴找到一个更好的解决方案--但这里是:我添加了一个延迟函数:
function delay(time) {
return new Promise(resolve => setTimeout(resolve, time));
}然后,在运行“等待consumer.run(.”)之后在"consumer.disconnect()“之前,我这样称呼它:
await consumer.run({
eachMessage: async ({ message }) => {
messages.push(message)
console.log('RECEIVED MESSAGE', JSON.parse(message.value), message.offset);
}
})
})
/*Here is where the code waits for messages to be pulled */
await delay(5000)
consumer.disconnect();https://stackoverflow.com/questions/73679848
复制相似问题