首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >NodeJS快递API -票务/排队系统

NodeJS快递API -票务/排队系统
EN

Stack Overflow用户
提问于 2021-09-20 14:45:08
回答 1查看 718关注 0票数 0

在结尾处重新措辞,

NodeJS通过GRPC与其他API通信。

每个外部API都有自己的与Node的专用GRPC连接,每个专用的GRPC连接都有一个可以同时服务的并发客户端的上限(例如,外部API 1的上限为30个用户)。

每个对速成API的请求,都可能需要与外部API 1External 2External 3 (从现在起,EAP1、EAP2等)进行通信,而且该快速API还具有一个可供EAP2使用的并发客户端(例如100个客户端)的上限。

所以,我是如何考虑解决这个问题的:

  1. 客户端向Express API提出新的请求。
  2. 中间件queueManager为客户端创建一张票证(把它看作是一张批准对系统访问的票据--它拥有客户端的基本数据(例如名称))
  3. 客户端获取票证,创建一个事件侦听器,该侦听器以其票证ID作为事件名称(当系统准备接受票证时,它将产生票证ID为事件),并输入"Lobby“,其中客户端只等待他们的票证ID被接受/宣布(事件)。

我的问题是,我无法真正考虑如何实现系统跟踪票证的方式,以及如何根据系统的并发客户端拥有队列。

在允许客户端访问系统之前,系统本身应该:

  1. 检查Express API是否达到了并发客户端->的上限(如果是这样的话),应该等到新的票务位置可用时再进行检查。
  2. 如果有一个新的职位可用,它应该检查票证,并找出它需要联系哪个API。例如,如果它需要联系EAP1,它应该检查有多少当前客户端使用GRPC连接。这已经实现了(每个外部API都在一个拥有所需所有信息的类下)。如果EAP1已经达到了上限,那么NodeJS应该稍后再试(但是,多久之后呢?)我是否应该在系统完成对EAP1的另一个请求之后发出一个系统事件?)

我知道公牛,但我不确定它是否符合我的要求。我真正需要做的是让客户端排队,并且:

  1. 检查Express API是否已达到并发用户的上限。
  • 如果某个位置是空的,则从票证的数组中弹出一张票证
  1. 检查EAPx是否已达到并发用户的上限。
  • 如果为真,请尝试另一张需要与其他EAP通信的票证(如果可用)。
  • 如果为假,则授予访问权

编辑:另一个想法可能是有两个公牛队列。一个用于Express API (其中选项“并发”可以设置为Express API的上限)和一个用于EAP。每个EAP队列将有一个不同的工作人员(以设置上限)。

改称为

为了更好地描述这一问题,我将尝试重新表述需求。

对该系统的一个简单看法可以是:

我已经使用了Clem的建议(RabbitMQ),但同样,我不能实现与限制(上限)的并发。

所以,

  1. 客户向TicketHandler索取一张票。为了让TicketHandler构建一个新的票据,客户端和其他信息一起提供了一个callback
代码语言:javascript
复制
TicketHandler.getTicket(variousInfo, function () {
    next();
  })

系统将使用回调来允许客户端与EAP连接。

  1. TickerHandler得到了票: )将其添加到队列中。 当可以访问票证(没有到达上限)时,它会询问适当的EAP处理程序,客户端是否可以使用GRPC连接。如果是,那么请TicketHandler锁定一个位置,然后调用票证的可用回调(从步骤1) (如果不是),TicketHandler将检查下一个需要与其他EAP联系的可用票证。这种情况应该一直持续到TicketHandler首先通知TicketHandler“没有可用的位置”,然后向TicketHandler发送一条消息,以便通知TicketHandler“现在有X个可用职位”(或“1个可用位置”)。然后TicketHandler,应该检查之前无法访问EAPx的票证,并再次询问EAPx是否可以访问GRPC连接。
EN

回答 1

Stack Overflow用户

发布于 2021-09-20 17:16:32

从你的描述中,我明白了以下几点:

  • 你有一个Node.js前端层.每个Node.js框需要被限制为最多100个客户端。
  • 您有一个未定义的后台层,它与前面层中的框具有GRPC连接(让我们称它们为EAP)。每个EAP <-> Node.js GRPS链接仅限于N个并发连接。

我在这里看到的只是服务器级别和连接级别的限制,因此我认为没有理由让任何分布式系统(比如Bull)来管理队列(如果Node.js框死了,就没有人能够恢复HTTP上下文来提供对特定请求的响应--因此,当一个Node.js框死亡时,对其请求的响应就不会更有用)。

考虑到这一点,我只需创建一个本地队列(就像数组一样简单)来管理您的队列。

免责声明:这必须被视为伪代码,以下内容是简化和未经测试的。

这可能是一个队列实现:

代码语言:javascript
复制
interface SimpleQueueObject<Req, Res> {
  req: Req;
  then: (Res) => void;
  catch: (any) => void;
}


class SimpleQueue<Req = any, Res = any> {

  constructor(
    protected size: number = 100,
    /** async function to be executed when a request is de-queued */
    protected execute: (req: Req) => Promise<Res>,
    /** an optional function that may ba used to indicate a request is
     not yet ready to be de-queued. In such case nex request will be attempted */
    protected ready?: (req: Req) => boolean,
  ) { }

  _queue: SimpleQueueObject<Req, Res>[] = [];
  _running: number = 0;

  private _dispatch() {
    // Queues all available
    while (this._running < this.size && this._queue.length > 0) {
      // Accept
      let obj;
      if (this.ready) {
        const ix = this._queue.findIndex(o => this.ready(o.req));
        // todo : this may cause queue to stall (for now we throw error)
        if (ix === -1) return;
        obj = this._queue.splice(ix, 1)[0];
      } else {
        obj = this._queue.pop();
      }
      // Execute
      this.execute(obj.req)
        // Resolves the main request
        .then(obj.then)
        .catch(obj.catch)
        // Attempts to queue something else after an outcome from EAP
        .finally(() => {
          this._running --;
          this._dispatch();
        });
      obj.running = true;
      this._running ++;
    }
  }

  /** Queue a request, fail if queue is busy */
  queue(req: Req): Promise<Res> {
    if (this._running >= this.size) {
      throw "Queue is busy";
    }

    // Queue up
    return new Promise<Res>((resolve, reject) => {
      this._queue.push({ req, then: resolve, catch: reject });
      this._dispatch();
    });
  }

  /** Queue a request (even if busy), but wait a maximum time
   * for the request to be de-queued */
  queueTimeout(req: Req, maxWait: number): Promise<Res> {
    return new Promise<Res>((resolve, reject) => {
      const obj: SimpleQueueObject<Req, Res> = { req, then: resolve, catch: reject };
      // Expire if not started after maxWait
      const _t = setTimeout(() => {
        const ix = this._queue.indexOf(obj);
        if (ix !== -1) {
          this._queue.splice(ix, 1);
          reject("Request expired");
        }
      }, maxWait);
      // todo : clear timeout
      // Queue up
      this._queue.push(obj);
      this._dispatch();
    })
  }

  isBusy(): boolean {
    return this._running >= this.size;
  }

}

然后,您的Node.js业务逻辑可能会执行如下操作:

代码语言:javascript
复制
const EAP1: SimpleQueue = /* ... */;
const EAP2: SimpleQueue = /* ... */;

const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
  100,
  // Forward request to EAP
  async req => {
    if (req.forEap1) {
      // Example 1: this will fail if EAP1 is busy
      return EAP1.queue(req);
    } else if (req.forEap2) {
      // Example 2: this will fail if EAP2 is busy and the request can not
      // be queued within 200ms
      return EAP2.queueTimeout(req, 200);
    }
  }
)

app.get('/', function (req, res) {
  // Forward request to ingress queue
  INGRESS.queue(req)
    .then(r => res.status(200).send(r))
    .catch(e => res.status(400).send(e));
})

或者,此解决方案将允许您(根据请求)也接受繁忙的EAP请求(最多可达100个),并在它们准备就绪时发送:

代码语言:javascript
复制
const INGRESS: SimpleQueue = new SimpleQueue<any, any>(
  100,
  // Forward request to EAP
  async req => {
    if (req.forEap1) {
      return EAP1.queue(req);
    } else if (req.forEap2) {
      return EAP2.queue(req);
    }
  },
  // Delay queue for busy consumers
  req => {
    if (req.forEap1) {
      return !EAP1.isBusy();
    } else if (req.forEap2) {
      return !EAP2.isBusy();
    } else {
      return true;
    }
  }
)

请注意:

  • 在本例中,当接收到100多个并发请求时,Node.js将开始抛出(在节流时抛出503并不少见)
  • 当您有更多的节流限制(在您的情况下是Node.js和GRPC )时,要小心,因为第一个限制可能会导致秒数减少(考虑接收100个EAP1请求,然后接收10个EAP2请求,Node.js将满是EAP1请求,并拒绝EAP2请求--所有EAP2都不做任何事情)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69256398

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档