在结尾处重新措辞,
NodeJS通过GRPC与其他API通信。
每个外部API都有自己的与Node的专用GRPC连接,每个专用的GRPC连接都有一个可以同时服务的并发客户端的上限(例如,外部API 1的上限为30个用户)。
每个对速成API的请求,都可能需要与外部API 1、External 2或External 3 (从现在起,EAP1、EAP2等)进行通信,而且该快速API还具有一个可供EAP2使用的并发客户端(例如100个客户端)的上限。
所以,我是如何考虑解决这个问题的:
我的问题是,我无法真正考虑如何实现系统跟踪票证的方式,以及如何根据系统的并发客户端拥有队列。

在允许客户端访问系统之前,系统本身应该:
我知道公牛,但我不确定它是否符合我的要求。我真正需要做的是让客户端排队,并且:
编辑:另一个想法可能是有两个公牛队列。一个用于Express API (其中选项“并发”可以设置为Express API的上限)和一个用于EAP。每个EAP队列将有一个不同的工作人员(以设置上限)。
改称为
为了更好地描述这一问题,我将尝试重新表述需求。
对该系统的一个简单看法可以是:

我已经使用了Clem的建议(RabbitMQ),但同样,我不能实现与限制(上限)的并发。
所以,
TicketHandler索取一张票。为了让TicketHandler构建一个新的票据,客户端和其他信息一起提供了一个callbackTicketHandler.getTicket(variousInfo, function () {
next();
})系统将使用回调来允许客户端与EAP连接。
发布于 2021-09-20 17:16:32
从你的描述中,我明白了以下几点:
我在这里看到的只是服务器级别和连接级别的限制,因此我认为没有理由让任何分布式系统(比如Bull)来管理队列(如果Node.js框死了,就没有人能够恢复HTTP上下文来提供对特定请求的响应--因此,当一个Node.js框死亡时,对其请求的响应就不会更有用)。
考虑到这一点,我只需创建一个本地队列(就像数组一样简单)来管理您的队列。
免责声明:这必须被视为伪代码,以下内容是简化和未经测试的。
这可能是一个队列实现:
interface SimpleQueueObject<Req, Res> {
req: Req;
then: (Res) => void;
catch: (any) => void;
}
class SimpleQueue<Req = any, Res = any> {
constructor(
protected size: number = 100,
/** async function to be executed when a request is de-queued */
protected execute: (req: Req) => Promise<Res>,
/** an optional function that may ba used to indicate a request is
not yet ready to be de-queued. In such case nex request will be attempted */
protected ready?: (req: Req) => boolean,
) { }
_queue: SimpleQueueObject<Req, Res>[] = [];
_running: number = 0;
private _dispatch() {
// Queues all available
while (this._running < this.size && this._queue.length > 0) {
// Accept
let obj;
if (this.ready) {
const ix = this._queue.findIndex(o => this.ready(o.req));
// todo : this may cause queue to stall (for now we throw error)
if (ix === -1) return;
obj = this._queue.splice(ix, 1)[0];
} else {
obj = this._queue.pop();
}
// Execute
this.execute(obj.req)
// Resolves the main request
.then(obj.then)
.catch(obj.catch)
// Attempts to queue something else after an outcome from EAP
.finally(() => {
this._running --;
this._dispatch();
});
obj.running = true;
this._running ++;
}
}
/** Queue a request, fail if queue is busy */
queue(req: Req): Promise<Res> {
if (this._running >= this.size) {
throw "Queue is busy";
}
// Queue up
return new Promise<Res>((resolve, reject) => {
this._queue.push({ req, then: resolve, catch: reject });
this._dispatch();
});
}
/** Queue a request (even if busy), but wait a maximum time
* for the request to be de-queued */
queueTimeout(req: Req, maxWait: number): Promise<Res> {
return new Promise<Res>((resolve, reject) => {
const obj: SimpleQueueObject<Req, Res> = { req, then: resolve, catch: reject };
// Expire if not started after maxWait
const _t = setTimeout(() => {
const ix = this._queue.indexOf(obj);
if (ix !== -1) {
this._queue.splice(ix, 1);
reject("Request expired");
}
}, maxWait);
// todo : clear timeout
// Queue up
this._queue.push(obj);
this._dispatch();
})
}
isBusy(): boolean {
return this._running >= this.size;
}
}然后,您的Node.js业务逻辑可能会执行如下操作:
const EAP1: SimpleQueue = /* ... */;
const EAP2: SimpleQueue = /* ... */;
const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
100,
// Forward request to EAP
async req => {
if (req.forEap1) {
// Example 1: this will fail if EAP1 is busy
return EAP1.queue(req);
} else if (req.forEap2) {
// Example 2: this will fail if EAP2 is busy and the request can not
// be queued within 200ms
return EAP2.queueTimeout(req, 200);
}
}
)
app.get('/', function (req, res) {
// Forward request to ingress queue
INGRESS.queue(req)
.then(r => res.status(200).send(r))
.catch(e => res.status(400).send(e));
})或者,此解决方案将允许您(根据请求)也接受繁忙的EAP请求(最多可达100个),并在它们准备就绪时发送:
const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
100,
// Forward request to EAP
async req => {
if (req.forEap1) {
return EAP1.queue(req);
} else if (req.forEap2) {
return EAP2.queue(req);
}
},
// Delay queue for busy consumers
req => {
if (req.forEap1) {
return !EAP1.isBusy();
} else if (req.forEap2) {
return !EAP2.isBusy();
} else {
return true;
}
}
)请注意:
https://stackoverflow.com/questions/69256398
复制相似问题