我在Kafkajs的消费者方面有优势,有时我会犯一个再平衡错误:
The group is rebalancing, so a rejoin is needed
[Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Runner] The group is rebalancing, re-joining然后,一旦使用者组被重新平衡,被处理的最后一条消息将再次被处理,因为由于错误而没有发生提交。
Kafka消费者初始化代码:
import { Consumer, Kafka } from 'kafkajs';
const kafkaInstance = new Kafka({
clientId: 'some_client_id',
brokers: ['brokers list'],
ssl: true
});
const kafkaConsumer = kafkaInstance.consumer({ groupId: 'some_consumer_group_id });
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'some_topic', fromBeginning: true });
await kafkaConsumer.run({
autoCommit: false, // cancel auto commit in order to control committing
eachMessage: ... some processing function
});我将sessionTimeout和heartbeatInteval增加到更高的值和不同的组合,但是仍然在重消息负载的情况下,我得到了错误。
我在heartbeat函数中添加了对eachMessage函数的调用,这似乎解决了这个问题。
但我想知道它是否被认为是“良好做法”,还是我可以在消费者方面做些什么来防止这样的错误呢?
发布于 2022-04-20 07:27:25
我在heartbeat函数中添加了一个对eachMessage函数的调用,这似乎解决了这个问题。
https://stackoverflow.com/questions/71689242
复制相似问题