首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在重新连接时使用node-amqplib取消订阅特定队列

在重新连接时使用node-amqplib取消订阅特定队列
EN

Stack Overflow用户
提问于 2021-01-14 19:09:48
回答 1查看 177关注 0票数 0

问题:远程系统重新连接到多个节点的websocket服务器,为每个系统创建/使用RabbitMQ中的专用队列。如果不存在活动连接,则应自动删除队列。Websocket连接/断开事件处理程序是异步的,相当繁重,观察到断开事件处理程序在重新连接后完成的问题,使系统不一致。

主要的问题是RabbitMQ队列-最初的解决方案是为每个连接创建唯一的队列,并在断开连接时删除它们。看起来很重。

第二种方法是为每个远程系统保留一个专用队列(对于任何连接都使用相同的队列名称),问题是assertQueue为同一队列添加了使用者。需要找到在不删除队列本身的情况下删除陈旧队列使用者的方法。

EN

回答 1

Stack Overflow用户

发布于 2021-01-14 19:09:48

解决方案是使用最老的consumerTag存储每个远程系统的消费者列表和on disconnect event trigger cancel函数,然后更新给定远程系统的队列消费者列表。

在远程系统连接事件上

代码语言:javascript
复制
import { Replies } from "amqplib";

// bind callback function for queue-specific messages and store returned consumer description
const result: Replies.Consume = await channel.consume(queueName, this.onSomeMessage.bind(this)); 

// update consumers list for the connected remote system
const consumers: Array<string> | undefined = this.consumers.get(remoteId);
if (consumers === undefined) {
  const consumersList: Array<string> = new Array();
  consumersList.push(result.consumerTag);
  this.consumers.set(remoteId, consumersList);
} else {
  consumers.push(result.consumerTag);
}

发生远程系统断开连接事件

代码语言:javascript
复制
// remove the oldest consumer in the list and update the list itself
// use cancel method of the amqp channel
const consumers = this.consumers.get(remoteId);
if (consumers === undefined) {  
  // shouldn't happen
  console.error(`consumers list for ${remoteId} is empty`);
} else {
  const consumerTag = consumers[0];
  await this.rxchannel.addSetup(async (channel: ConfirmChannel) => {
    await channel.cancel(consumerTag);
    consumers.shift();
  });
}

这些代码片段来自一些类的方法实现(如果你想知道"this")。

版权声明(尤其是对于德国同事):这个答案中的代码可以在啤酒软件(https://en.wikipedia.org/wiki/Beerware)或麻省理工学院的许可下使用(无论你喜欢什么)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65717885

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档