首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >未在RabbitMQ和节点中使用AMQP.Node的死信消息

未在RabbitMQ和节点中使用AMQP.Node的死信消息
EN

Stack Overflow用户
提问于 2013-07-31 11:32:10
回答 3查看 5.5K关注 0票数 5

我想在一段时间后在我的一个工人那里收到一条信息。在发现所谓的死信交换之后,我决定和Node和RabbitMQ一起去.

消息似乎被发送到DeadExchange中的队列中,但是在WorkQueue中WorkExchange中经过时间之后,使用者永远不会接收消息。要么是bindQueue关闭了,要么是死信不起作用?

我已经尝试了很多不同的价值观。有人能指出我错过了什么吗?

代码语言:javascript
复制
var amqp = require('amqplib');
var url = 'amqp://dev.rabbitmq.com';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', '');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to a random (unique) queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-message-ttl': 2000,
                    'x-expires': 10000
                }
            })
        }).then(function(ok) {
            console.log('Sending delayed message');

            return ch.sendToQueue(ok.queue, new Buffer(':)'));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});

我使用的是amqp.node (https://github.com/squaremo/amqp.node),它是npm中的amqplib。虽然节点-amqp (https://github.com/postwait/node-amqp)似乎要流行得多,但它并没有实现完整的协议,而且在重新连接方面也存在一些突出的问题。

dev.rabbitmq.com正在运行RabbitMQ 3.1.3。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2013-08-01 17:32:38

在Channel#assertQueue中,AMQP.Node中有一个bug刚刚修复,参见https://github.com/squaremo/amqp.node/commit/3749c66b448875d2df374e6a89946c0bdd0cb918。修正是在GitHub,但还没有在国家预防机制。

票数 0
EN

Stack Overflow用户

发布于 2015-07-03 12:35:13

这是一个工作的code.When --消息在DeadExchange中花费的时间超过了to,它被推送到WorkExchange。成功的关键是定义正确的路由密钥。您希望发送post ttl的交换队列应该有一个路由密钥(注意:非默认),“x死信路由-密钥”属性值应该与该路由密钥匹配。

代码语言:javascript
复制
var amqp = require('amqplib');
var url = 'amqp://localhost';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', 'rk1');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to DEQ queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('DEQ', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-dead-letter-routing-key': 'rk1',
                    'x-message-ttl': 15000,
                    'x-expires': 100000
                }
            })
        }).then(function() {
            return ch.bindQueue('DEQ', 'DeadExchange', '');
        }).then(function() {
            console.log('Sending delayed message');

            return ch.publish('DeadExchange', '', new Buffer("Over the Hills and Far Away!"));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});

票数 7
EN

Stack Overflow用户

发布于 2019-07-01 17:33:41

下面是一个为Node使用AMQP连接管理器的示例。我注意到似乎没有任何示例与我们在代码中所做的工作相匹配,因此我使用一个简单的示例进行了回购,并通过将其重新发布回主exchange:https://github.com/PritchardAlexander/node-amqp-dead-letter-queue进行了重试计数。

下面是一个简单的例子:

代码语言:javascript
复制
const amqp = require('amqp-connection-manager');
const username = encodeURIComponent('queue');
const password = encodeURIComponent('pass');
const port = '5672';
const host = 'localhost';
const connectionString = `amqp://${username}:${password}@${host}:${port}`;

// Ask the connection manager for a ChannelWrapper.  Specify a setup function to
// run every time we reconnect to the broker.
connection = amqp.connect([connectionString]);

// A channel is your ongoing connection to RabbitMQ.
// All commands go through your channel.
connection.createChannel({
  json: true,
  setup: function (channel) {
    channel.prefetch(100);

    // Setup EXCHANGES - which are hubs you PUBLISH to that dispatch MESSAGES to QUEUES
    return Promise.all([
      channel.assertExchange('Test_MainExchange', 'topic', {
        durable: false,
        autoDelete: true,
        noAck: false
      }),
      channel.assertExchange('Test_DeadLetterExchange', 'topic', {
        durable: false,
        autoDelete: true,
        maxLength: 1000,
        noAck: true // This means dead letter messages will not need an explicit acknowledgement or rejection
      })
    ])
    // Setup QUEUES - which are delegated MESSAGES by EXCHANGES.
    // The MESSAGES then need to be CONSUMED.
    .then(() => {
      return Promise.all([
        channel.assertQueue(
          'Test_MainQueue',
          options = {
            durable: true,
            autoDelete: true,
            exclusive: false,
            messageTtl: 1000*60*60*1,
            deadLetterExchange: 'Test_DeadLetterExchange'
          }
        ),
        channel.assertQueue('Test_DeadLetterQueue',
          options = {
            durable: false,
            autoDelete: true,
            exclusive: false
          }
        )
      ]);
    })
    // This glues the QUEUES and EXCHANGES together
    // The last parameter is a routing key. A hash/pound just means: give me all messages in the exchange.
    .then(() => {
      return Promise.all([
        channel.bindQueue('Test_MainQueue', 'Test_MainExchange', '#'),
        channel.bindQueue('Test_DeadLetterQueue', 'Test_DeadLetterExchange', '#')
      ]);
    })
    // Setup our CONSUMERS
    // They pick MESSAGES off of QUEUES and do something with them (either ack or nack them)
    .then(() => {
      return Promise.all([
        channel.consume('Test_MainQueue', (msg) => {
          const stringifiedContent = msg.content ? msg.content.toString() : '{}';
          console.log('Test_MainQueue::CONSUME ' + stringifiedContent);

          const messageData = JSON.parse(stringifiedContent);
          if (messageData.value === 0) {
            console.log('Test_MainQueue::REJECT ' + stringifiedContent);
            // the 'false' param at the very end means, don't retry! dead letter this instead!
            return channel.nack(msg, true, false);
          }
          return channel.ack(msg);
        })
      ]),
      channel.consume('Test_DeadLetterQueue', (msg) => {
        const stringifiedContent = msg.content ? msg.content.toString() : '{}';
        console.log('');
        console.log('Test_DeadLetterQueue::CONSUME ' + stringifiedContent);
        console.log('');
      });
    })
    .then(() => {
      setInterval(function () {
        const messageData = {
          text: 'Dead letter if 0',
          value: Math.floor(Math.random()*5)
        };
        const stringifiedMessage = JSON.stringify(messageData);

        // Publish message to exchange
        if (channel.publish('Test_MainExchange', '', new Buffer(stringifiedMessage))) {
          console.log(`Sent ${stringifiedMessage}`);
        } else {
          console.log(`Failed to send ${stringifiedMessage}`);
        };
      }, 300);
    });
  }
});
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/17969152

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档