我将数据文件分成几个月,并对node.js进行集群,以便将作业分成批处理,供不同的线程处理,但我这样做的方式让一些线程没有工作可做,如下所示:
thread 1 selection [ '2004-05', '2004-06', '2004-07', '2004-08' ]
thread 2 selection [ '2004-09', '2004-10', '2004-11', '2004-12' ]
thread 9 selection [ '2007-01', '2007-02', '2007-03', '2007-04' ]
thread 7 selection [ '2006-05', '2006-06', '2006-07', '2006-08' ]
thread 5 selection [ '2005-09', '2005-10', '2005-11', '2005-12' ]
thread 4 selection [ '2005-05', '2005-06', '2005-07', '2005-08' ]
thread 8 selection [ '2006-09', '2006-10', '2006-11', '2006-12' ]
thread 6 selection [ '2006-01', '2006-02', '2006-03', '2006-04' ]
thread 10 selection [ '2007-05', '2007-06', '2007-07', '2007-08' ]
thread 3 selection [ '2005-01', '2005-02', '2005-03', '2005-04' ]
thread 11 selection [ '2007-09', '2007-10', '2007-11', '2007-12' ]
thread 0 selection [ '2004-01', '2004-02', '2004-03', '2004-04' ]
thread 15 selection []
thread 14 selection []
thread 13 selection []
thread 12 selection [ '2008-01', '2008-02', '2008-03' ]看,线程13、14和15没有工作要做,浪费了我机器上的内核。下面是我的代码,忽略集群样板代码,假设i等于线程数(在我的例子中是0-15):
let dateStart = moment('2004-01-02');
let dateEnd = moment('2008-03-02');
let timeValues = [];
while (dateEnd > dateStart || dateStart.format('M') === dateEnd.format('M')) {
timeValues.push(dateStart.format('YYYY-MM'));
dateStart.add(1, 'month');
}
let i = parseInt(process.env.workerId);
let monthBatchCount = Math.ceil(timeValues.length / cpus);
let selectionStart = i * monthBatchCount;
let selectionEnd = selectionStart + monthBatchCount;
let selection = timeValues.slice(selectionStart, selectionEnd)
console.log("thread", i, "selection", selection)我如何改变我的方法来更有效地将作业分发到批处理中,这样就不会留下空批处理的线程?
发布于 2020-06-27 01:02:00
一种方法是让每个worker从主线程中拉出工作单元,而不是将工作推给它们。父线程将作为工作单元的代理工作,而工作线程一旦产生,就会请求工作,执行工作,然后在循环中请求更多的工作。
// Parent code
const unitsOfWork = [...];
const workers = [...];
workers.forEach(worker => {
worker.on('message', (message) => {
if (message.type === 'CLAIM_WORK') {
const unit = unitsOfWork.pop();
const message = unit ? { type: 'WORK', unit } : { type: 'WORK_FINISHED' };
worker.postMessage(message);
}
});
});// Worker code
const { parentPort } = require('worker_threads');
parentPort.on('message', (message) => {
if (message.type === 'WORK') {
performWork(message.unit);
parentPort.postMessage({ type 'CLAIM_WORK' });
} else if (message.type === 'WORK_FINISHED') {
// Exit?
}
});
parentPort.postMessage({ type 'CLAIM_WORK' });发布于 2020-06-27 02:41:25
雅各布的答案实际上是最有效的解决方案,因为如果不是所有的批处理作业都需要相同的时间,他的方法将让提前完成任务的线程返回并完成更多的工作,而不是在任务较难的线程完成时等待。
但如果有人想知道如何正确地将队列划分为批,也许对于其他一些用例,这里就是,我使用了相同的队列弹出原则:
let dateStart = moment('2004-01-02');
let dateEnd = moment('2008-03-02');
let timeValues = [];
while (dateEnd > dateStart || dateStart.format('M') === dateEnd.format('M')) {
timeValues.push(dateStart.format('YYYY-MM'));
dateStart.add(1, 'month');
}
let i = parseInt(process.env.workerId);
let batches = [];
let workerId = 0;
while (timeValues.length > 0) {
if (!batches[workerId]) batches[workerId] = [];
batches[workerId].push(timeValues.pop());
workerId++;
if (workerId > 15) workerId = 0;
}
let batch = batches[i];
console.log("batch", batch)https://stackoverflow.com/questions/62599454
复制相似问题