首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如果满足任何条件,则触发函数的执行。

如果满足任何条件,则触发函数的执行。
EN

Stack Overflow用户
提问于 2021-10-07 23:44:35
回答 2查看 254关注 0票数 0

我正在使用Node.js编写一个HTTP,下面是我试图实现的目标:

  • 我有一个经常性的任务,我想定期运行,大约每分钟。这个任务是用一个名为task的异步函数实现的。
  • 为了响应API中的调用,我也希望立即调用该任务。
  • task函数的两次执行不能同时执行。每个执行都应该在另一个执行开始之前运行到完成。

代码如下所示:

代码语言:javascript
复制
// only a single execution of this function is allowed at a time
// which is not the case with the current code
async function task(reason: string) {
  console.log("do thing because %s...", reason);
  await sleep(1000);
  console.log("done");
}

// call task regularly
setIntervalAsync(async () => {
  await task("ticker");
}, 5000) // normally 1min

// call task immediately
app.get("/task", async (req, res) => {
  await task("trigger");
  res.send("ok");
});

我在https://github.com/piec/question.js放了一个完整的示例项目

如果我在围棋中,我会像一样做这件事,这很容易,但我不知道如何使用Node.js。

我已经考虑或尝试过的想法:

  • 显然,我可以使用来自task库的互斥体将异步互斥放在关键部分。但我不太喜欢在js代码中添加互斥变量。
  • 许多人似乎都在使用带有工作进程的消息队列库(bee- queue,bullmq,.)但这通常会给诸如redis这样的外部服务添加依赖项。而且,如果我正确的话,代码会更复杂一些,因为我需要一个主入口点和一个工作进程的入口点。而且,您不能像在“正常”单个进程情况下那样轻松地与工作人员共享对象。
  • 我已经尝试了RxJ主题,以使一个生产者消费渠道。但是我不能一次只执行一个task (task是异步的)。

谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-10-08 02:51:33

您可以创建自己的序列化异步队列,并通过该队列运行任务。

这个队列使用一个标志来跟踪它是否已经在运行一个异步操作。如果是这样的话,它只是将任务添加到队列中,并在完成当前操作时运行它。如果没有,它现在就运行它。将其添加到队列中将返回一个承诺,这样调用方就可以知道任务何时才能运行。

如果任务是异步的,则需要它们返回链接到异步活动的承诺。您也可以混合使用非异步任务,并且它们也将被序列化。

代码语言:javascript
复制
class SerializedAsyncQueue {
    constructor() {
        this.tasks = [];
        this.inProcess = false;
    }
    // adds a promise-returning function and its args to the queue
    // returns a promise that resolves when the function finally gets to run
    add(fn, ...args) {
        let d = new Deferred();
        this.tasks.push({ fn, args: ...args, deferred: d });
        this.check();
        return d.promise;
    }
    check() {
        if (!this.inProcess && this.tasks.length) {
            // run next task
            this.inProcess = true;
            const nextTask = this.tasks.shift();
            Promise.resolve(nextTask.fn(...nextTask.args)).then(val => {
                this.inProcess = false;
                nextTask.deferred.resolve(val);
                this.check();
            }).catch(err => {
                console.log(err);
                this.inProcess = false;
                nextTask.deferred.reject(err);
                this.check();
            });
        }
    }
}

const Deferred = function() {
    if (!(this instanceof Deferred)) {
        return new Deferred();
    }
    const p = this.promise = new Promise((resolve, reject) => {
        this.resolve = resolve;
        this.reject = reject;
    });
    this.then = p.then.bind(p);
    this.catch = p.catch.bind(p);
    if (p.finally) {
        this.finally = p.finally.bind(p);
    }
}


let queue = new SerializedAsyncQueue();

// utility function
const sleep = function(t) {
    return new Promise(resolve => {
        setTimeout(resolve, t);
    });
}

// only a single execution of this function is allowed at a time
// so it is run only via the queue that makes sure it is serialized
async function task(reason: string) {
    function runIt() {
        console.log("do thing because %s...", reason);
        await sleep(1000);
        console.log("done");
    }
    return queue.add(runIt);
}

// call task regularly
setIntervalAsync(async () => {
    await task("ticker");
}, 5000) // normally 1min

// call task immediately
app.get("/task", async (req, res) => {
    await task("trigger");
    res.send("ok");
});
票数 1
EN

Stack Overflow用户

发布于 2021-10-08 02:12:07

下面是一个使用RxJS#Subject的版本,它几乎可以工作。如何完成它取决于您的用例。

代码语言:javascript
复制
async function task(reason: string) {
  console.log("do thing because %s...", reason);
  await sleep(1000);
  console.log("done");
}

const run = new Subject<string>();
const effect$ = run.pipe(
  // Limit one task at a time
  concatMap(task),
  share()
);
const effectSub = effect$.subscribe();

interval(5000).subscribe(_ => 
  run.next("ticker")
);

// call task immediately
app.get("/task", async (req, res) => {
  effect$.pipe(
    take(1)
  ).subscribe(_ =>
    res.send("ok")
  );
  run.next("trigger");
});

这里的问题是,res.send("ok")是链接到effect$流下一个发射。这可能不是您将要调用的run.next生成的。

有很多方法可以解决这个问题。例如,您可以使用ID标记每个发射,然后在使用res.send("ok")之前等待相应的发射。

还有更好的方法,如果电话是自然区分的。

笨重的ID版本

随机生成一个ID不是一个好主意,但它会让整个过程变得更加复杂。您可以生成任何您喜欢的唯一ID。它们可以以某种方式直接集成到任务中,或者以100%的方式保持独立(任务本身不知道在运行之前分配了一个ID )。

代码语言:javascript
复制
interface IdTask {
  taskId: number,
  reason: string
}

interface IdResponse {
  taskId: number,
  response: any
}

async function task(reason: string) {
  console.log("do thing because %s...", reason);
  await sleep(1000);
  console.log("done");
}

const run = new Subject<IdTask>();
const effect$: Observable<IdResponse> = run.pipe(
  // concatMap only allows one observable at a time to run
  concatMap((eTask: IdTask) => from(task(eTask.reason)).pipe(
    map((response:any) => ({
      taskId: eTask.taskId,
      response
    })as IdResponse)
  )),
  share()
);
const effectSub = effect$.subscribe({
  next: v => console.log("This is a shared task emission: ", v)
});

interval(5000).subscribe(num => 
  run.next({
    taskId: num,
    reason: "ticker"
  })
);

// call task immediately
app.get("/task", async (req, res) => {
  const randomId = Math.random();
  effect$.pipe(
    filter(({taskId}) => taskId == randomId),
    take(1)
  ).subscribe(_ =>
    res.send("ok")
  );
  run.next({
    taskId: randomId,
    reason: "trigger"
  });
});
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69488943

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档