我正在编写一个使用Node和kafkajs https://kafka.js.org/docs/consuming实现2个消费者的服务
我有两个异步函数,它初始化了使用者,理论上是挂起来读取消息的。就像这样:
const { Kafka } = require('kafkajs')
// Create the client with the broker list
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });
const consumerModule = async (): Promise<void> => {
await consumer.connect();
await consumer.subscribe({ topic: 'topic-A' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
});
}
export default consumerModule;如果我希望使用者在服务的后台运行,我是否应该在初始化服务器时导入模块文件?
我觉得这是行不通的,因为必须调用consumerModule函数,而且它是async,不知道如何正确地实现它。
例如:
import express, { Express } from 'express';
import consumerModuleA from './modules/consumerModuleA';
import consumerModuleB from './modules/consumerModuleB';
async function bootstrap(): Promise<Express> {
const app = express();
app.get('/health', (_req, res) => {
res.send('ok');
});
return app;
}请给我建议。
发布于 2022-02-09 08:49:34
我建议如下。备注:
通过参数化topic/group/etc).
import express, { Express } from 'express';
import consumerModule from './modules/consumerModule';
consumerModule();
async function bootstrap(): Promise<Express> {
const app = express();
app.get('/health', (_req, res) => {
res.send('ok');
});
return app;
}https://stackoverflow.com/questions/71035795
复制相似问题