首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >wkhtmltoimage,蓝鸟承诺,每次15

wkhtmltoimage,蓝鸟承诺,每次15
EN

Stack Overflow用户
提问于 2018-01-11 22:40:51
回答 1查看 26关注 0票数 0

我有以下nodejs程序从SQS队列中检索大量的网站urls,并使用wkhtmltoimage (每次15个)截屏:

代码语言:javascript
复制
    var concurrency=15;
    //break down queue size into batches of concurrency size

    var getTotalNumberMessages = sqs.getQueueAttributesAsync({
        QueueUrl: queueUrl,
        AttributeNames: ['All']
    }).then(function(data) {

        var total = Array(Math.floor(parseInt(data.Attributes.ApproximateNumberOfMessages)/ concurrency));

        Promise.each(total, function (value, index, length) {

            var toDo = Array(Math.floor(concurrency / 10) + 1);
            messages = [];
            var gmPromises = [];
            Promise.each(toDo, function (value, index, length) {
                gmPromises.push(
                    sqs.receiveMessageAsync({
                        QueueUrl: queueUrl,
                        WaitTimeSeconds: 20,
                        VisibilityTimeout: 120, 
                        MaxNumberOfMessages: (concurrency < 10 ? concurrency : 10)
                    }).then(function (data) {
                        if (data.Messages.length == 0) {
                            done = true;
                        } else {
                            messages = messages.concat(data.Messages);
                        }
                    })
                );
            }).then(function() {
                Promise.all(gmPromises).then(function () {

                    var promises = [];

                    Promise.map(messages, function (message) {

                        var tmpFilename = '/tmp/' + md5(s3key) + '.png';

                        var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);

                        console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
                        process.stdout.on('data', function (data) {
                            console.log(data.toString());
                        });

                        process.stderr.on('data', function (err) {
                            console.log(err.toString());
                        });

                        return new Promise(function (resolve, reject) {
                            process.on('exit', function (code) {
                                if (code == 0) {
                                    fs.readFile(tmpFilename, {}, function (err, data) {
                                            if (err) {
                                                throw err;
                                            } else {
                                                var fileData = Buffer.from(data, 'binary');

                                                var s3 = new AWS.S3();
                                                s3.putObject({
                                                    Bucket: 'mybucket',
                                                    Key: s3key,
                                                    Body: fileData,
                                                    ACL: 'public-read'
                                                }, function (err, resp) {
                                                    var deleteMessagePromise = sqs.deleteMessage({
                                                        QueueUrl: queueUrl,
                                                        ReceiptHandle: message.ReceiptHandle
                                                    }).promise();
                                                    deleteMessagePromise.catch(function (err) {
                                                        console.log('SQS deleteMessage failed: ', err, err.stack);
                                                    });
                                                    promises.push(deleteMessagePromise);

                                                    console.log(arguments);
                                                    console.log('Successfully uploaded package.');
                                                    resolve();
                                                });
                                            }
                                        }
                                    );
                                }
                            });
                        });
                    }, {
                        concurrency: 15
                    });


                    return Promise.all(promises);
                });
            });
        });

然而,我发现有超过15个wkhtmltoimage映像并行运行。从SQS中检索15批消息似乎是并行的,尽管我使用的是蓝鸟的Promise.each?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-01-17 08:55:39

在我上面的代码中,我所做的是

  • 找出我排队的物品有多少
  • 除以并发处理项的数量,->组的数目
  • 对于每一组
    • 一次处理15次
    • 循环到下一个组。

因为所有相关代码都使用异步调用,所以队列中的所有项都将并行处理,这不是代码的意图。相反,我想限制并发性,所以我只使用wkhtmltoimage使用的x内存量

我需要的是从队列中接收15个项目,然后停止接收项目,直到我的wkhtml进程完成为止。然后,我可以接收项目,直到我有15个进程再次运行。

经过相当多的研究,我发现这就是node.js流暂停模式中所做的。我还找到了一个node.js包,它将SQS队列封装到流中:sqs-可读流

这是我的最后代码:

代码语言:javascript
复制
    var sqsStream = new SQSReadableStream({
        sqsClient: sqs,
        queueUrl: queueUrl
    });

    var collect = [];

    sqsStream.on('data', function(message){

        collect.push(message);

        if (collect.length >= concurrency){
            sqsStream.pause();
        }

        var body = JSON.parse(message.Body);

        var s3key = id + '/' + befAft + '/' + body.url.replace(/http(s)?:\/\//, '') + '.png';

        var tmpFilename = '/tmp/' + md5(s3key) + '.png';

        var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);

        console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
        process.stdout.on('data', function (data) {
            //console.log(data.toString());
        });

        process.stderr.on('data', function (err) {
            //console.log(err.toString());
        });

        process.on('exit', function (code) {
            if (code == 0) {
                fs.readFile(tmpFilename, {}, function (err, data) {
                        if (err) {
                            throw err;
                        } else {
                            var fileData = Buffer.from(data, 'binary');

                            var s3 = new AWS.S3();
                            s3.putObject({
                                Bucket: 'somebucket',
                                Key: s3key,
                                Body: fileData
                            }, function (err, resp) {
                                message.deleteMessage();
                                console.log(arguments);
                                console.log('Successfully uploaded package.');
                                collect.pop();
                                sqsStream.resume()
                            });
                        }
                    }
                );
            }
        });

    });
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48216969

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档