首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafkajs -“组正在重新平衡,因此需要重新连接”错误导致消息被消耗不止一次。

Kafkajs -“组正在重新平衡,因此需要重新连接”错误导致消息被消耗不止一次。
EN

Stack Overflow用户
提问于 2022-03-31 08:24:47
回答 1查看 3.4K关注 0票数 4

我在Kafkajs的消费者方面有优势,有时我会犯一个再平衡错误:

代码语言:javascript
复制
The group is rebalancing, so a rejoin is needed
[Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Runner] The group is rebalancing, re-joining

然后,一旦使用者组被重新平衡,被处理的最后一条消息将再次被处理,因为由于错误而没有发生提交。

Kafka消费者初始化代码:

代码语言:javascript
复制
import { Consumer, Kafka } from 'kafkajs';


const kafkaInstance = new Kafka({
      clientId: 'some_client_id',
      brokers: ['brokers list'],
      ssl: true
    });

const kafkaConsumer = kafkaInstance.consumer({ groupId: 'some_consumer_group_id });
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'some_topic', fromBeginning: true });

await kafkaConsumer.run({
      autoCommit: false, // cancel auto commit in order to control committing
      eachMessage: ... some processing function
});

我将sessionTimeoutheartbeatInteval增加到更高的值和不同的组合,但是仍然在重消息负载的情况下,我得到了错误。

我在heartbeat函数中添加了对eachMessage函数的调用,这似乎解决了这个问题。

但我想知道它是否被认为是“良好做法”,还是我可以在消费者方面做些什么来防止这样的错误呢?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-04-20 07:27:25

我在heartbeat函数中添加了一个对eachMessage函数的调用,这似乎解决了这个问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71689242

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档