首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用bull的作业处理微服务

使用bull的作业处理微服务
EN

Stack Overflow用户
提问于 2019-09-22 13:09:16
回答 2查看 2.8K关注 0票数 3

我想使用node.js 公牛处理计划好的作业。基本上,我有两个处理器,处理两种类型的作业。有一个配置器,它配置将使用cron添加到牛队列中的作业。

调度程序将在一个微服务中,每个处理器将是一个单独的微服务。所以我将有3个微型服务。

我的问题是,我是否对公牛使用了正确的模式?

index.js

代码语言:javascript
复制
const Queue = require('bull');

const fetchQueue = new Queue('MyScheduler');
fetchQueue.add("fetcher", {name: "earthQuakeAlert"}, {repeat: {cron: '1-59/2 * * * *'}, removeOnComplete: true});
fetchQueue.add("fetcher", {name: "weatherAlert"}, {repeat: {cron: '3-59/3 * * * *'}, removeOnComplete: true});

processor-configurator.js

代码语言:javascript
复制
const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("processor", __dirname + "/alert-processor");

fetcher-configurator.js

代码语言:javascript
复制
const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("fetcher", __dirname+"/fetcher");

fetcher.js

代码语言:javascript
复制
const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

alert-processor.js

代码语言:javascript
复制
const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

我们会有三个微型服务-

  1. 节点index.js
  2. 节点获取器-配置器.
  3. 节点处理器-配置器.

我从公牛身上看到了不一致的行为。有时,我会得到缺少作业类型的错误处理程序。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-09-23 19:32:25

引用我自己的话,希望这对别人有帮助。

这是因为两个工作人员使用相同的队列。工作者试图从队列中获取下一个作业,接收错误类型的作业(例如“取取器”而不是“处理器”),但由于它知道如何处理“处理器”而不知道如何处理“取器”而失败。Bull不允许您只从队列中获取兼容的作业,这两个工作人员都应该能够处理所有类型的作业。最简单的解决方案是使用两个不同的队列,一个用于处理器,另一个用于获取器。然后您可以从作业和处理器中删除名称,由于名称是由队列定义的,因此不再需要它。

https://github.com/OptimalBits/bull/issues/1481

票数 6
EN

Stack Overflow用户

发布于 2021-07-08 20:57:35

公牛:

expiration-queue.js

代码语言:javascript
复制
import Queue from 'bull';
import { ExpirationCompletePublisher } from '../events/publishers/expiration-complete-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
  orderId: string;
}

const expirationQueue = new Queue<Payload>('order:expiration', {
  redis: {
    host: process.env.REDIS_HOST, 
  },
});

expirationQueue.process(async (job) => {
  console.log('Expiries order id', job.data.orderId);
  new ExpirationCompletePublisher(natsWrapper.client).publish({
    orderId: job.data.orderId,
  });
});

export { expirationQueue };

promotionEndQueue.js

代码语言:javascript
复制
import Queue from 'bull';
import { PromotionEndedPublisher } from '../events/publishers/promotion-ended-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
  promotionId: string;
}

const promotionEndQueue = new Queue<Payload>('promotions:end', {
  redis: {
    host: process.env.REDIS_HOST, // look at expiration-depl.yaml
  },
});

promotionEndQueue.process(async (job) => {
  console.log('Expiries promotion id', job.data.promotionId);
  new PromotionEndedPublisher(natsWrapper.client).publish({
    promotionId: job.data.promotionId,
  });
});

export { promotionEndQueue };

order-created-listener.js

代码语言:javascript
复制
import { Listener, OrderCreatedEvent, Subjects } from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { expirationQueue } from '../../queues/expiration-queue';
export class OrderCreatedListener extends Listener<OrderCreatedEvent> {
  subject: Subjects.OrderCreated = Subjects.OrderCreated;
  queueGroupName = queueGroupName;

  async onMessage(data: OrderCreatedEvent['data'], msg: Message) {
    // delay = expiredTime - currentTime
    const delay = new Date(data.expiresAt).getTime() - new Date().getTime();
    // console.log("delay", delay)
    await expirationQueue.add(
      {
        orderId: data.id,
      },
      {
        delay,
      }
    );

    msg.ack();
  }
}

promotion-started-listener.js

代码语言:javascript
复制
import {
  Listener,
  PromotionStartedEvent,
  Subjects,
} from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { promotionEndQueue } from '../../queues/promotions-end-queue';
export class PromotionStartedListener extends Listener<PromotionStartedEvent> {
  subject: Subjects.PromotionStarted = Subjects.PromotionStarted;
  queueGroupName = queueGroupName;

  async onMessage(data: PromotionStartedEvent['data'], msg: Message) {
    // delay = expiredTime - currentTime
    const delay = new Date(data.endTime).getTime() - new Date().getTime();

    // console.log("delay", delay)
    await promotionEndQueue.add(
      {
        promotionId: data.id,
      },
      {
        delay,
      }
    );

    msg.ack();
  }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58049541

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档