我们使用@azure/service-bus包处理来自多个主题的消息批处理。
我们用于每2秒从主题中获取20条消息的代码如下所示。
let isProcessing: boolean = false;
setInterval(async () => {
if (isProcessing === false) {
isProcessing = true;
try {
const messages: Array<ServiceBusMessage>
= await receiver.receiveMessages(Configuration.SB.batchSize as number);
if (messages.length > 0) {
this.logger.info(`[SB] ${topic} - ${messages.length} require processing`);
await Promise.all([
...messages.map(message => this.handleMsg(receiver, message, topic, moduleRef, handler))
]).catch(error => {
this.logger.error(error.message, error);
});
}
isProcessing = false;
} catch (error) {
this.logger.error(error.message, error);
isProcessing = false;
}
}
}, Configuration.SB.tickInterval as number);我的问题是-这是最好的方法吗?有更好的方法吗?,它可以工作,而且性能相当好,但是我认为我们有时会丢失receiveAndDelete消息,如果它是我们的实现的话,我正在尝试解决它。
谢谢你的帮助
发布于 2019-11-19 17:31:12
,它可以工作,而且性能相当好,但是我认为我们有时会丢失receiveAndDelete消息,如果它实现的话,我正在尝试解决它。
有两种接收消息的模式
使用ReceiveAndDelete
PeekLock的
当使用ReceiveAndDelete模式时,客户端接收消息时,它们将自动从服务器中删除。所以这最多是一次送货。
使用PeekLock,一个消息被“租用”给客户机最多5分钟,客户端要么通过请求消息完成来确认成功的处理,要么通过取消/死信(如果无法处理)来确认成功的处理。如果这些操作都没有在定义的租约时间内进行(这不需要严格地说是5分钟,而且可能更短),则会重新尝试该消息,直到超过最大传递尝试数(MaxDeliveryCount),并且消息被死信为止。请注意,消息从未丢失。即使它没有经过处理,也是死板的。因此,这是至少一次交付,这可能更适合您的情况。它将对您编写客户端的方式产生轻微的影响,但不会造成很大的变化。
https://stackoverflow.com/questions/58929755
复制相似问题