首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >运行eachMessage的kafkajs不获取消息。

运行eachMessage的kafkajs不获取消息。
EN

Stack Overflow用户
提问于 2022-09-11 13:56:01
回答 1查看 257关注 0票数 0

我试图使用kafkajs从Kafka获取消息,我想将所有消息捕获到" messages“(一个数组) config中--在这里保存信任的变量是我使用的函数:

代码语言:javascript
复制
const { Kafka, logLevel } = require('kafkajs')
async function consume_messages(config) {
    const kafka = new Kafka({
        logLevel: logLevel.INFO,
        brokers: [config.broker],
        ssl: true,
        sasl: {
            mechanism: [config.mechanism], 
            username: config.Username,
            password: config.Password
        },
    })
    const topic = config.client_id
    const consumer = kafka.consumer({
        groupId: 'my-group', fromBeginning: true
    })
    await consumer.connect();
    await consumer.subscribe({
        topics: [topic],
        fromBeginning: true
    })
    let messages = []
    await consumer.run({
        eachMessage: async ({ message }) => {
             messages.push(message)
             console.log('RECEIVED MESSAGE', JSON.parse(message.value), message.offset);
          }
        })
       
    })
    await consumer.disconnect();
    return messages ;
}

当我运行它时,我没有得到任何“消息”,但是,当我从“等待consumer.run”({“-> "consumer.run({”->“consumer.run))中移除等待时,"messages”仍然是空的,但只有在整个脚本结束之后,它才能工作。

我如何强制它运行并等待所有消息被获取?

EN

回答 1

Stack Overflow用户

发布于 2022-09-13 06:41:04

唯一对我起作用的是在运行()结束后添加一个暂停。我必须说,我对这个解决方案不满意,因为我强迫我的一方等待固定的5秒,不管结果如何。

我会很高兴找到一个更好的解决方案--但这里是:我添加了一个延迟函数:

代码语言:javascript
复制
    function delay(time) {
    return new Promise(resolve => setTimeout(resolve, time));
}

然后,在运行“等待consumer.run(.”)之后在"consumer.disconnect()“之前,我这样称呼它:

代码语言:javascript
复制
    await consumer.run({
        eachMessage: async ({ message }) => {
             messages.push(message)
             console.log('RECEIVED MESSAGE', JSON.parse(message.value), message.offset);
          }
        })
           
        })
/*Here is where the code waits for messages to be pulled  */
       await delay(5000)
       consumer.disconnect();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73679848

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档