我有以下nodejs程序从SQS队列中检索大量的网站urls,并使用wkhtmltoimage (每次15个)截屏:
var concurrency=15;
//break down queue size into batches of concurrency size
var getTotalNumberMessages = sqs.getQueueAttributesAsync({
QueueUrl: queueUrl,
AttributeNames: ['All']
}).then(function(data) {
var total = Array(Math.floor(parseInt(data.Attributes.ApproximateNumberOfMessages)/ concurrency));
Promise.each(total, function (value, index, length) {
var toDo = Array(Math.floor(concurrency / 10) + 1);
messages = [];
var gmPromises = [];
Promise.each(toDo, function (value, index, length) {
gmPromises.push(
sqs.receiveMessageAsync({
QueueUrl: queueUrl,
WaitTimeSeconds: 20,
VisibilityTimeout: 120,
MaxNumberOfMessages: (concurrency < 10 ? concurrency : 10)
}).then(function (data) {
if (data.Messages.length == 0) {
done = true;
} else {
messages = messages.concat(data.Messages);
}
})
);
}).then(function() {
Promise.all(gmPromises).then(function () {
var promises = [];
Promise.map(messages, function (message) {
var tmpFilename = '/tmp/' + md5(s3key) + '.png';
var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
process.stdout.on('data', function (data) {
console.log(data.toString());
});
process.stderr.on('data', function (err) {
console.log(err.toString());
});
return new Promise(function (resolve, reject) {
process.on('exit', function (code) {
if (code == 0) {
fs.readFile(tmpFilename, {}, function (err, data) {
if (err) {
throw err;
} else {
var fileData = Buffer.from(data, 'binary');
var s3 = new AWS.S3();
s3.putObject({
Bucket: 'mybucket',
Key: s3key,
Body: fileData,
ACL: 'public-read'
}, function (err, resp) {
var deleteMessagePromise = sqs.deleteMessage({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
deleteMessagePromise.catch(function (err) {
console.log('SQS deleteMessage failed: ', err, err.stack);
});
promises.push(deleteMessagePromise);
console.log(arguments);
console.log('Successfully uploaded package.');
resolve();
});
}
}
);
}
});
});
}, {
concurrency: 15
});
return Promise.all(promises);
});
});
});然而,我发现有超过15个wkhtmltoimage映像并行运行。从SQS中检索15批消息似乎是并行的,尽管我使用的是蓝鸟的Promise.each?
发布于 2018-01-17 08:55:39
在我上面的代码中,我所做的是
因为所有相关代码都使用异步调用,所以队列中的所有项都将并行处理,这不是代码的意图。相反,我想限制并发性,所以我只使用wkhtmltoimage使用的x内存量
我需要的是从队列中接收15个项目,然后停止接收项目,直到我的wkhtml进程完成为止。然后,我可以接收项目,直到我有15个进程再次运行。
经过相当多的研究,我发现这就是node.js流在暂停模式中所做的。我还找到了一个node.js包,它将SQS队列封装到流中:sqs-可读流
这是我的最后代码:
var sqsStream = new SQSReadableStream({
sqsClient: sqs,
queueUrl: queueUrl
});
var collect = [];
sqsStream.on('data', function(message){
collect.push(message);
if (collect.length >= concurrency){
sqsStream.pause();
}
var body = JSON.parse(message.Body);
var s3key = id + '/' + befAft + '/' + body.url.replace(/http(s)?:\/\//, '') + '.png';
var tmpFilename = '/tmp/' + md5(s3key) + '.png';
var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
process.stdout.on('data', function (data) {
//console.log(data.toString());
});
process.stderr.on('data', function (err) {
//console.log(err.toString());
});
process.on('exit', function (code) {
if (code == 0) {
fs.readFile(tmpFilename, {}, function (err, data) {
if (err) {
throw err;
} else {
var fileData = Buffer.from(data, 'binary');
var s3 = new AWS.S3();
s3.putObject({
Bucket: 'somebucket',
Key: s3key,
Body: fileData
}, function (err, resp) {
message.deleteMessage();
console.log(arguments);
console.log('Successfully uploaded package.');
collect.pop();
sqsStream.resume()
});
}
}
);
}
});
});https://stackoverflow.com/questions/48216969
复制相似问题