在我们的nest.js应用程序中,我们为kafka使用了卡夫卡伊客户机。我们需要得到机会监控统计数据。衡量标准之一是lag。
试图找出卡夫卡伊是否提供了任何有趣的东西。(有效载荷中最有趣的是:timestamp、offset、batchContext.firstOffset、batchContext.firstTimestamp、batchContext.maxTimestamp)
问题
有没有办法记录lag值和kafkajs提供的其他统计数据?
我是否应该考虑在使用kafka.js客户端的节点应用程序中实现自己的统计监视器来收集所需的信息?
新细节1
遵循文档,我可以得到batch.highWatermark,其中
batch.highWatermark是主题分区内最后提交的偏移量。它可以用于计算滞后。
正在尝试
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async (data) => {
console.log('Received data.batch.messages: ', data.batch.messages)
console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
},
})我可以像下一个那样获得信息:
Received data.batch.messages: [
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '144',
key: null,
value: <Buffer 68 65 6c 6c 6f 21>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '145',
key: null,
value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '146',
key: null,
value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
}
]
Received data.batch.highWatermark: 147那么在标签计算中如何使用batch.highWatermark有什么想法吗?
发布于 2022-08-01 09:14:48
看来,获得偏移滞后度量的唯一方法是使用仪器事件
consumer.on(consumer.events.END_BATCH_PROCESS, (payload) =>
console.log(payload.offsetLagLow),
);offsetLagLow测量批处理中的第一条消息和分区中的最后一条偏移量(highWatermark)之间的偏移量增量。您也可以使用offsetLag,但它基于批处理的最后偏移量。
正如@Sergii所提到的,当您使用eachBatch时,有一些道具可以直接使用(这里是batch支柱上的所有可用方法)。但是如果你使用eachMessage,你就得不到那个道具了。所以仪器事件是最普遍的方法。
发布于 2021-08-28 08:33:02
一般来说,所描述的配置工作正常。通过使用eachMessage属性,其他配置损坏的工作如下:
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
})因此,同时,使用者配置应该只配置一个属性eachBatch或eachMessage。
https://stackoverflow.com/questions/68683467
复制相似问题