我很难一次把整批货都交出来。在我看到的一个示例,在该示例中,它们在处理完批处理后解析单个消息的偏移量。文档中,在处理批处理中的所有消息后,我需要解析偏移量或提交它们。
我能用内置的eachBatchAutoResolve做这个吗?我还需要像resolveOffset这样的东西吗?
发布于 2022-08-12 09:30:01
我想我明白了。使用eachBatchAutoResolve。如果我正确理解了文档,如果storePoints()抛出,它就不会提交。和
await this.redpandaConsumer.run({
eachBatch: async ({ batch, heartbeat, isRunning, isStale }) => {
const points: Points[] = [];
for (const message of batch.messages) {
if (!isRunning() || isStale()) break;
points.push(message);
await heartbeat(); // not sure if needed here
}
await this.storePoints(points); // throws if it fails
await heartbeat();
},
eachBatchAutoResolve: true,
});https://stackoverflow.com/questions/73321714
复制相似问题