使用Azure NodeJS库,我无法以每秒5或6条消息的速度从Azure服务总线队列接收消息。这比官方文件建议的要慢几个数量级。我使用的是关闭分区的队列(如这里所建议的,使用WebJob从Azure WebJob轮询Azure服务总线队列),在ReceiveAndDelete模式下读取。实际上,我只是重复调用.receiveQueueMessage()。
基于这个问题(Azure服务总线可伸缩性),似乎其他人在使用NodeJS时每秒也会看到5/6的消息。这是个很难的限制吗?有没有已知的解决办法或优化?
发布于 2015-12-07 05:56:22
您提供的线程中的代码非常棒,它似乎从管道工作流中的Service队列接收消息,在接收完上一条消息后,该队列将接收下一条消息。
正如在场景阶段的高吞吐量队列部分在官方网站上提到的那样,我们可以找到适合您的情况的以下要点:
为了实现这些优化,我们可以利用Azure github回购的样本。
最大限度地提高单个队列的吞吐量。我有一个简单的测试,利用上面的示例代码,并将循环时间设置为10 to。我测试的结果是:在ReceiveAndDelete模式下,它将每秒获得近50条消息;在PeekLock模式下,它将获得大约70条消息,如https://azure.microsoft.com/en-us/documentation/articles/service-bus-nodejs-how-to-use-queues/#receive-messages-from-a-queue所示。
var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(connectionString);
function checkForMessages(sbService, queueName, callback) {
sbService.receiveQueueMessage(queueName,{ isPeekLock: true }, function (err, lockedMessage) {
if (err) {
if (err === 'No messages to receive') {
console.log('No messages');
} else {
callback(err);
}
} else {
callback(null, lockedMessage);
}
});
}
function processMessage(sbService, err, lockedMsg) {
if (err) {
console.log('Error on Rx: ', err);
} else {
console.log('Rx: ', lockedMsg);
sbService.deleteMessage(lockedMsg, function(err2) {
if (err2) {
console.log('Failed to delete message: ', err2);
} else {
console.log('Deleted message.');
}
})
}
}
var idx = 0;
function sendMessages(serviceBus, queueName) {
var msg = 'Message # ' + (++idx) + (' '+uuid.v4());
serviceBus.sendQueueMessage(queueName, msg, function (err) {
if (err) {
console.log('Failed Tx: ', err);
} else {
console.log('Sent ' + msg);
}
});
}
var queueName = 'myqueue';
serviceBus.getQueue(queueName, function (err,res) {
if (err) {
console.log('Failed: ', err);
} else {
console.log('current msg count '+ res.MessageCount);
// var t = setInterval(checkForMessages.bind(null, serviceBus, queueName,function(err, lockedMsg){}), 10); //ReceiveAndDelete mode
var t = setInterval(checkForMessages.bind(null, serviceBus, queueName, processMessage.bind(null, serviceBus)), 10); // PeekLock mode
// setInterval(sendMessages.bind(null, serviceBus, queueName), 100);
setTimeout(function(){
clearInterval(t);
console.log('task over');
},1000);
}
}); https://stackoverflow.com/questions/34111382
复制相似问题