我想使用node.js 公牛处理计划好的作业。基本上,我有两个处理器,处理两种类型的作业。有一个配置器,它配置将使用cron添加到牛队列中的作业。
调度程序将在一个微服务中,每个处理器将是一个单独的微服务。所以我将有3个微型服务。
我的问题是,我是否对公牛使用了正确的模式?
index.js
const Queue = require('bull');
const fetchQueue = new Queue('MyScheduler');
fetchQueue.add("fetcher", {name: "earthQuakeAlert"}, {repeat: {cron: '1-59/2 * * * *'}, removeOnComplete: true});
fetchQueue.add("fetcher", {name: "weatherAlert"}, {repeat: {cron: '3-59/3 * * * *'}, removeOnComplete: true});processor-configurator.js
const Queue=require('bull');
const scheduler = new Queue("MyScheduler");
scheduler.process("processor", __dirname + "/alert-processor");fetcher-configurator.js
const Queue=require('bull');
const scheduler = new Queue("MyScheduler");
scheduler.process("fetcher", __dirname+"/fetcher");fetcher.js
const Queue = require('bull');
const moment = require('moment');
module.exports = function (job) {
const scheduler = new Queue('MyScheduler');
console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
return Promise.resolve()
};alert-processor.js
const Queue = require('bull');
const moment = require('moment');
module.exports = function (job) {
const scheduler = new Queue('MyScheduler');
console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
return Promise.resolve()
};我们会有三个微型服务-
我从公牛身上看到了不一致的行为。有时,我会得到缺少作业类型的错误处理程序。
发布于 2019-09-23 19:32:25
引用我自己的话,希望这对别人有帮助。
这是因为两个工作人员使用相同的队列。工作者试图从队列中获取下一个作业,接收错误类型的作业(例如“取取器”而不是“处理器”),但由于它知道如何处理“处理器”而不知道如何处理“取器”而失败。Bull不允许您只从队列中获取兼容的作业,这两个工作人员都应该能够处理所有类型的作业。最简单的解决方案是使用两个不同的队列,一个用于处理器,另一个用于获取器。然后您可以从作业和处理器中删除名称,由于名称是由队列定义的,因此不再需要它。
发布于 2021-07-08 20:57:35
公牛:
expiration-queue.js
import Queue from 'bull';
import { ExpirationCompletePublisher } from '../events/publishers/expiration-complete-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
orderId: string;
}
const expirationQueue = new Queue<Payload>('order:expiration', {
redis: {
host: process.env.REDIS_HOST,
},
});
expirationQueue.process(async (job) => {
console.log('Expiries order id', job.data.orderId);
new ExpirationCompletePublisher(natsWrapper.client).publish({
orderId: job.data.orderId,
});
});
export { expirationQueue };promotionEndQueue.js
import Queue from 'bull';
import { PromotionEndedPublisher } from '../events/publishers/promotion-ended-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
promotionId: string;
}
const promotionEndQueue = new Queue<Payload>('promotions:end', {
redis: {
host: process.env.REDIS_HOST, // look at expiration-depl.yaml
},
});
promotionEndQueue.process(async (job) => {
console.log('Expiries promotion id', job.data.promotionId);
new PromotionEndedPublisher(natsWrapper.client).publish({
promotionId: job.data.promotionId,
});
});
export { promotionEndQueue };order-created-listener.js
import { Listener, OrderCreatedEvent, Subjects } from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { expirationQueue } from '../../queues/expiration-queue';
export class OrderCreatedListener extends Listener<OrderCreatedEvent> {
subject: Subjects.OrderCreated = Subjects.OrderCreated;
queueGroupName = queueGroupName;
async onMessage(data: OrderCreatedEvent['data'], msg: Message) {
// delay = expiredTime - currentTime
const delay = new Date(data.expiresAt).getTime() - new Date().getTime();
// console.log("delay", delay)
await expirationQueue.add(
{
orderId: data.id,
},
{
delay,
}
);
msg.ack();
}
}promotion-started-listener.js
import {
Listener,
PromotionStartedEvent,
Subjects,
} from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { promotionEndQueue } from '../../queues/promotions-end-queue';
export class PromotionStartedListener extends Listener<PromotionStartedEvent> {
subject: Subjects.PromotionStarted = Subjects.PromotionStarted;
queueGroupName = queueGroupName;
async onMessage(data: PromotionStartedEvent['data'], msg: Message) {
// delay = expiredTime - currentTime
const delay = new Date(data.endTime).getTime() - new Date().getTime();
// console.log("delay", delay)
await promotionEndQueue.add(
{
promotionId: data.id,
},
{
delay,
}
);
msg.ack();
}
}https://stackoverflow.com/questions/58049541
复制相似问题