我使用以下代码初始化Kafka客户端:
this.kafka = new Kafka({
clientId: <my_client_ID>,
brokers: [
`${process.env.KAFKA_BROKER_1}`,
`${process.env.KAFKA_BROKER_2}`,
`${process.env.KAFKA_BROKER_3}`,
],
retry: {
initialRetryTime: 3000,
retries: 3,
},
});现在,如果连接到代理有问题,它会抛出如下错误:
{"level":"ERROR","timestamp":"2022-10-19T04:21:08.143Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"<broker_1>","clientId":"<my_client_id"}
{"level":"ERROR","timestamp":"2022-10-19T04:21:08.144Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":299}
{"level":"ERROR","timestamp":"2022-10-19T04:21:08.143Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"<broker_2>","clientId":"<my_client_id"}
{"level":"ERROR","timestamp":"2022-10-19T04:21:09.447Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":564}
{"level":"ERROR","timestamp":"2022-10-19T04:21:08.143Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"<broker_3>","clientId":"<my_client_id"}
{"level":"ERROR","timestamp":"2022-10-19T04:21:11.014Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1008}现在,我想在这里更改日志消息,或者在这种情况发生后添加一条自定义消息。我正在考虑将它封装在一个试图捕获块中,但出于某种原因,它没有抛出异常。所以:
谢谢。
发布于 2022-11-09 08:43:49
据我所知,您希望添加自定义日志来处理某些错误场景。
卡夫卡提供了一种方法来创建我们自己的自定义日志,并使用它们代替默认日志。
这里是相同的参考。您可以检查您的kafkajs版本,以避免兼容性问题。在下面添加了一个示例。
{
level: 4,
label: 'INFO', // NOTHING, ERROR, WARN, INFO, or DEBUG
timestamp: '2017-12-29T13:39:54.575Z',
logger: 'kafkajs',
message: 'Started',
// ... any other extra key provided to the log function
}
const { logLevel } = require('kafkajs')
const winston = require('winston')
const toWinstonLogLevel = level => switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return 'error'
case logLevel.WARN:
return 'warn'
case logLevel.INFO:
return 'info'
case logLevel.DEBUG:
return 'debug'
}
const WinstonLogCreator = logLevel => {
const logger = winston.createLogger({
level: toWinstonLogLevel(logLevel),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'myapp.log' })
]
})
return ({ namespace, level, label, log }) => {
const { message, ...extra } = log
logger.log({
level: toWinstonLogLevel(level),
message,
extra,
})
}
}
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
logLevel: logLevel.ERROR,
logCreator: WinstonLogCreator
})谢谢。
https://stackoverflow.com/questions/74159188
复制相似问题