首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有最大并发性的异步并发队列

具有最大并发性的异步并发队列
EN

Stack Overflow用户
提问于 2017-12-08 17:54:55
回答 2查看 3.9K关注 0票数 3

我正在运行一个带有自定义异步队列的bug,该队列每次调用10个异步函数。

我用50个作业启动队列,一旦完成了前10个任务,队列就会移动到随后的10个,直到全部完成为止。

我遇到的问题是,一旦它完成了50,它就会从最初的5个作业开始,每次有2个或3个或1个作业。在队列结束时,它所需的作业也不到10个。

请创建这两个文件并使用mocha进行测试,并自己查看输出。

注意事项:将mocha中的超时设置为0,以使测试长时间运行。

Queue.js

代码语言:javascript
复制
function Queue(func, max) {
    this.jobs = [];
    this.func = func;
    this.max = max ? max : 10;
}

Queue.prototype.push = function(data) {
    var self = this;
    return new Promise(function(resolve, reject){
        self.jobs.push({data: data, resolve: resolve, reject: reject});
        if(!self.progress) {
            self.progress = true;
            self.run();
        }
    });
};

Queue.prototype.run = function() {
    var self = this;
    var tasks = [];

    console.log("--------------------");

    for(var i=0; i<this.jobs.length && i < this.max; i++) {
        tasks.push(this.jobs.shift());
        console.log("queuing", tasks[tasks.length-1].data);
    }
    console.log("Total jobs queued", tasks.length);

    Promise.all(
        tasks.map(function(task){
            return self.func(task.data)
                .then(task.resolve, task.reject);
        }
    )).then(this.next.bind(this));
};

Queue.prototype.next = function(){
    if(this.jobs.length) {
        this.run();
    } else {
        this.progress = false;
    }
};

module.exports = Queue;

QueueTest.js

代码语言:javascript
复制
function async(data) {
    return new Promise(function(resolve, reject){
        setTimeout(function(){
            console.log("resolving", data);
            resolve(data);
        }, Math.random() * 5000);
    });
}

it("should test queue", function(done){
    var queue = new Queue(async);
    Promise.all(
        [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
            30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50].map(queue.push.bind(queue))
    ).then(function(){
        done();
    });
});
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-12-10 02:56:30

问题在于for循环在Queue.prototype.run中。

我无法立即理解为什么它应该以它所做的方式不正确的行为,但一个修复是用self.jobs.splice()替换self.jobs.splice()循环来创建tasks数组。

代码语言:javascript
复制
Queue.prototype.run = function() {
    console.log("--------------------");
    var self = this;
    var tasks = self.jobs.splice(0, self.max); // <<<<<<<< this is the fix
    console.log("tasks:", tasks.map(obj => obj.data));

    Promise.all(
        tasks.map(function(task){
            return self.func(task.data)
            .then(task.resolve, task.reject);
        }
    )).then(this.next.bind(this));
};

没有什么需要改变的。

票数 2
EN

Stack Overflow用户

发布于 2022-02-23 15:06:34

从数组中并行调度任务,而不等待任何允许的线程完成。

代码语言:javascript
复制
const fastQueue = async <T, Q>(
  x: T[],
  threads: number,
  fn: (v: T, i: number, a: T[]) => Promise<Q>
) => {
  let k = 0;
  const result = Array(x.length) as Q[];
  await Promise.all(
    [...Array(threads)].map(async () => {
      while (k < x.length) result[k] = await fn(x[k], k++, x);
    })
  );
  return result;
};

const demo = async () => {
    const wait = (x: number) => new Promise(r => setTimeout(r, x, x))
    console.time('a')
    console.log(await fastQueue([1000, 2000, 3000, 2000, 2000], 4, (v) => wait(v)))
    console.timeEnd('a')
}
demo();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47719780

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档