问题:远程系统重新连接到多个节点的websocket服务器,为每个系统创建/使用RabbitMQ中的专用队列。如果不存在活动连接,则应自动删除队列。Websocket连接/断开事件处理程序是异步的,相当繁重,观察到断开事件处理程序在重新连接后完成的问题,使系统不一致。
主要的问题是RabbitMQ队列-最初的解决方案是为每个连接创建唯一的队列,并在断开连接时删除它们。看起来很重。
第二种方法是为每个远程系统保留一个专用队列(对于任何连接都使用相同的队列名称),问题是assertQueue为同一队列添加了使用者。需要找到在不删除队列本身的情况下删除陈旧队列使用者的方法。
发布于 2021-01-14 19:09:48
解决方案是使用最老的consumerTag存储每个远程系统的消费者列表和on disconnect event trigger cancel函数,然后更新给定远程系统的队列消费者列表。
在远程系统连接事件上
import { Replies } from "amqplib";
// bind callback function for queue-specific messages and store returned consumer description
const result: Replies.Consume = await channel.consume(queueName, this.onSomeMessage.bind(this));
// update consumers list for the connected remote system
const consumers: Array<string> | undefined = this.consumers.get(remoteId);
if (consumers === undefined) {
const consumersList: Array<string> = new Array();
consumersList.push(result.consumerTag);
this.consumers.set(remoteId, consumersList);
} else {
consumers.push(result.consumerTag);
}发生远程系统断开连接事件
// remove the oldest consumer in the list and update the list itself
// use cancel method of the amqp channel
const consumers = this.consumers.get(remoteId);
if (consumers === undefined) {
// shouldn't happen
console.error(`consumers list for ${remoteId} is empty`);
} else {
const consumerTag = consumers[0];
await this.rxchannel.addSetup(async (channel: ConfirmChannel) => {
await channel.cancel(consumerTag);
consumers.shift();
});
}这些代码片段来自一些类的方法实现(如果你想知道"this")。
版权声明(尤其是对于德国同事):这个答案中的代码可以在啤酒软件(https://en.wikipedia.org/wiki/Beerware)或麻省理工学院的许可下使用(无论你喜欢什么)。
https://stackoverflow.com/questions/65717885
复制相似问题