首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafkajs -获取统计数据(滞后)

Kafkajs -获取统计数据(滞后)
EN

Stack Overflow用户
提问于 2021-08-06 14:40:53
回答 2查看 765关注 0票数 2

在我们的nest.js应用程序中,我们为kafka使用了卡夫卡伊客户机。我们需要得到机会监控统计数据。衡量标准之一是lag

试图找出卡夫卡伊是否提供了任何有趣的东西。(有效载荷中最有趣的是:timestampoffsetbatchContext.firstOffsetbatchContext.firstTimestampbatchContext.maxTimestamp)

问题

有没有办法记录lag值和kafkajs提供的其他统计数据?

我是否应该考虑在使用kafka.js客户端的节点应用程序中实现自己的统计监视器来收集所需的信息?

新细节1

遵循文档,我可以得到batch.highWatermark,其中

batch.highWatermark是主题分区内最后提交的偏移量。它可以用于计算滞后。

正在尝试

代码语言:javascript
复制
  await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async (data) => {
      console.log('Received data.batch.messages: ', data.batch.messages)
      console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
    },
  })

我可以像下一个那样获得信息:

代码语言:javascript
复制
Received data.batch.messages:  [
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '144',
    key: null,
    value: <Buffer 68 65 6c 6c 6f 21>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '145',
    key: null,
    value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '146',
    key: null,
    value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  }
]
Received data.batch.highWatermark:  147

那么在标签计算中如何使用batch.highWatermark有什么想法吗?

EN

回答 2

Stack Overflow用户

发布于 2022-08-01 09:14:48

看来,获得偏移滞后度量的唯一方法是使用仪器事件

代码语言:javascript
复制
consumer.on(consumer.events.END_BATCH_PROCESS, (payload) =>
  console.log(payload.offsetLagLow),
);

offsetLagLow测量批处理中的第一条消息和分区中的最后一条偏移量(highWatermark)之间的偏移量增量。您也可以使用offsetLag,但它基于批处理的最后偏移量。

正如@Sergii所提到的,当您使用eachBatch时,有一些道具可以直接使用(这里batch支柱上的所有可用方法)。但是如果你使用eachMessage,你就得不到那个道具了。所以仪器事件是最普遍的方法。

票数 0
EN

Stack Overflow用户

发布于 2021-08-28 08:33:02

一般来说,所描述的配置工作正常。通过使用eachMessage属性,其他配置损坏的工作如下:

代码语言:javascript
复制
await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            key: message.key.toString(),
            value: message.value.toString(),
            headers: message.headers,
        })
    },
})

因此,同时,使用者配置应该只配置一个属性eachBatcheachMessage

票数 -2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68683467

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档