首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有Node库的Azure服务总线队列的最大吞吐量?

有Node库的Azure服务总线队列的最大吞吐量?
EN

Stack Overflow用户
提问于 2015-12-05 21:56:04
回答 1查看 1.4K关注 0票数 0

使用Azure NodeJS库,我无法以每秒5或6条消息的速度从Azure服务总线队列接收消息。这比官方文件建议的要慢几个数量级。我使用的是关闭分区的队列(如这里所建议的,使用WebJob从Azure WebJob轮询Azure服务总线队列),在ReceiveAndDelete模式下读取。实际上,我只是重复调用.receiveQueueMessage()。

基于这个问题(Azure服务总线可伸缩性),似乎其他人在使用NodeJS时每秒也会看到5/6的消息。这是个很难的限制吗?有没有已知的解决办法或优化?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-12-07 05:56:22

您提供的线程中的代码非常棒,它似乎从管道工作流中的Service队列接收消息,在接收完上一条消息后,该队列将接收下一条消息。

正如在场景阶段的高吞吐量队列部分在官方网站上提到的那样,我们可以找到适合您的情况的以下要点:

  • 若要提高队列的总体接收速率,请使用多个消息工厂创建接收方。
  • 使用异步操作来利用客户端批处理。
  • 将批处理间隔设置为50 Bus,以减少服务总线客户端协议传输的数量。如果使用多个发送器,则将批处理间隔增加到100 to。

为了实现这些优化,我们可以利用Azure github回购的样本

最大限度地提高单个队列的吞吐量。我有一个简单的测试,利用上面的示例代码,并将循环时间设置为10 to。我测试的结果是:在ReceiveAndDelete模式下,它将每秒获得近50条消息;在PeekLock模式下,它将获得大约70条消息,如https://azure.microsoft.com/en-us/documentation/articles/service-bus-nodejs-how-to-use-queues/#receive-messages-from-a-queue所示。

代码语言:javascript
复制
var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(connectionString);

function checkForMessages(sbService, queueName, callback) {
  sbService.receiveQueueMessage(queueName,{ isPeekLock: true }, function (err, lockedMessage) {

    if (err) {
      if (err === 'No messages to receive') {
        console.log('No messages');
      } else {
        callback(err);
      }
    } else {
      callback(null, lockedMessage);
    }
  });
}

function processMessage(sbService, err, lockedMsg) {
  if (err) {
    console.log('Error on Rx: ', err);
  } else {
    console.log('Rx: ', lockedMsg);
    sbService.deleteMessage(lockedMsg, function(err2) {
      if (err2) {
        console.log('Failed to delete message: ', err2);
      } else {
        console.log('Deleted message.');
      }
    })
  }
}

var idx = 0;
function sendMessages(serviceBus, queueName) {
  var msg = 'Message # ' + (++idx) + (' '+uuid.v4());
  serviceBus.sendQueueMessage(queueName, msg, function (err) {
   if (err) {
     console.log('Failed Tx: ', err);
   } else {
     console.log('Sent ' + msg);
   }
  });
}


var queueName = 'myqueue';
serviceBus.getQueue(queueName, function (err,res) {
  if (err) {
   console.log('Failed: ', err);
  } else {
  console.log('current msg count '+ res.MessageCount);
   // var t = setInterval(checkForMessages.bind(null, serviceBus, queueName,function(err, lockedMsg){}), 10);  //ReceiveAndDelete mode
   var t = setInterval(checkForMessages.bind(null, serviceBus, queueName, processMessage.bind(null, serviceBus)), 10);  // PeekLock mode
   // setInterval(sendMessages.bind(null, serviceBus, queueName), 100);
                setTimeout(function(){
                                clearInterval(t);
                                console.log('task over');
                   },1000);
  }
}); 
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34111382

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档