我正在尝试在BullMQ上创建一个结果队列,所有的工人都可以发送结果,添加带有特殊制作的jobID的作业。这个想法是,所有的结果都会生成一个特定的jobID,这样我就可以确切地知道结果是针对哪个进程的。
我尝试过documentation中描述的getNextJob,但没有成功。
我找到的解决方案是使用queuEvents:每个进程在结果队列的waiting状态上注册一个侦听器,当具有所需id的作业到达时,该进程将获取具有getJob的作业,读取结果数据并尝试将作业移动到已完成状态。它起作用了,我可以正确地抓取工人产生的结果。
我遇到的问题是将结果作业移动到已完成状态,因为我无法使用getJob配置锁令牌,并且收到Missing lock for job错误,作业仍处于活动状态。
这是我在进程中使用的(伪)代码
const jobID = "THE_ID_OF_THE_JOB_I_AM_WAITING_FOR";
const token = `${jobID}_results_worker`;
const queueEvents = new QueueEvents('results');
const resQueue = this.queues.get('results');
// I define a callback function to be able to remove the listener
const waitResult = async (job: {jobId: any}) => {
if (job.jobId === jobID){
debug(`Result job for ${jobID} received!`);
const resJob = await resQueue?.getJob(jobID) as Job;
queueEvents.removeListener('waiting', waitResult);
// THIS GENERATES the error
resJob?.moveToCompleted('Results received', token, false);
resolve(resJob?.data);
}
}
// Register the callback function on the queue
const listener = queueEvents?.on('waiting', waitResult );有谁知道如何正确处理moveToCompleted
发布于 2021-10-05 03:14:00
您可以开发一个results队列,如下所示:const queue_Results = new Queue('Results');,从那里您可以让一个工作者处理事件,如下所示const worker_Results = new Worker('Results', async (job: Job) => { // do something with the results from other jobs })
关于该方法的BullMQ文档,here
https://stackoverflow.com/questions/69268728
复制相似问题