我正在使用Node.js编写一个HTTP,下面是我试图实现的目标:
task的异步函数实现的。task函数的两次执行不能同时执行。每个执行都应该在另一个执行开始之前运行到完成。代码如下所示:
// only a single execution of this function is allowed at a time
// which is not the case with the current code
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
// call task regularly
setIntervalAsync(async () => {
await task("ticker");
}, 5000) // normally 1min
// call task immediately
app.get("/task", async (req, res) => {
await task("trigger");
res.send("ok");
});我在https://github.com/piec/question.js放了一个完整的示例项目
如果我在围棋中,我会像这一样做这件事,这很容易,但我不知道如何使用Node.js。
我已经考虑或尝试过的想法:
task库的互斥体将异步互斥放在关键部分。但我不太喜欢在js代码中添加互斥变量。task (task是异步的)。谢谢!
发布于 2021-10-08 02:51:33
您可以创建自己的序列化异步队列,并通过该队列运行任务。
这个队列使用一个标志来跟踪它是否已经在运行一个异步操作。如果是这样的话,它只是将任务添加到队列中,并在完成当前操作时运行它。如果没有,它现在就运行它。将其添加到队列中将返回一个承诺,这样调用方就可以知道任务何时才能运行。
如果任务是异步的,则需要它们返回链接到异步活动的承诺。您也可以混合使用非异步任务,并且它们也将被序列化。
class SerializedAsyncQueue {
constructor() {
this.tasks = [];
this.inProcess = false;
}
// adds a promise-returning function and its args to the queue
// returns a promise that resolves when the function finally gets to run
add(fn, ...args) {
let d = new Deferred();
this.tasks.push({ fn, args: ...args, deferred: d });
this.check();
return d.promise;
}
check() {
if (!this.inProcess && this.tasks.length) {
// run next task
this.inProcess = true;
const nextTask = this.tasks.shift();
Promise.resolve(nextTask.fn(...nextTask.args)).then(val => {
this.inProcess = false;
nextTask.deferred.resolve(val);
this.check();
}).catch(err => {
console.log(err);
this.inProcess = false;
nextTask.deferred.reject(err);
this.check();
});
}
}
}
const Deferred = function() {
if (!(this instanceof Deferred)) {
return new Deferred();
}
const p = this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
this.then = p.then.bind(p);
this.catch = p.catch.bind(p);
if (p.finally) {
this.finally = p.finally.bind(p);
}
}
let queue = new SerializedAsyncQueue();
// utility function
const sleep = function(t) {
return new Promise(resolve => {
setTimeout(resolve, t);
});
}
// only a single execution of this function is allowed at a time
// so it is run only via the queue that makes sure it is serialized
async function task(reason: string) {
function runIt() {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
return queue.add(runIt);
}
// call task regularly
setIntervalAsync(async () => {
await task("ticker");
}, 5000) // normally 1min
// call task immediately
app.get("/task", async (req, res) => {
await task("trigger");
res.send("ok");
});发布于 2021-10-08 02:12:07
下面是一个使用RxJS#Subject的版本,它几乎可以工作。如何完成它取决于您的用例。
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
const run = new Subject<string>();
const effect$ = run.pipe(
// Limit one task at a time
concatMap(task),
share()
);
const effectSub = effect$.subscribe();
interval(5000).subscribe(_ =>
run.next("ticker")
);
// call task immediately
app.get("/task", async (req, res) => {
effect$.pipe(
take(1)
).subscribe(_ =>
res.send("ok")
);
run.next("trigger");
});这里的问题是,res.send("ok")是链接到effect$流下一个发射。这可能不是您将要调用的run.next生成的。
有很多方法可以解决这个问题。例如,您可以使用ID标记每个发射,然后在使用res.send("ok")之前等待相应的发射。
还有更好的方法,如果电话是自然区分的。
笨重的ID版本
随机生成一个ID不是一个好主意,但它会让整个过程变得更加复杂。您可以生成任何您喜欢的唯一ID。它们可以以某种方式直接集成到任务中,或者以100%的方式保持独立(任务本身不知道在运行之前分配了一个ID )。
interface IdTask {
taskId: number,
reason: string
}
interface IdResponse {
taskId: number,
response: any
}
async function task(reason: string) {
console.log("do thing because %s...", reason);
await sleep(1000);
console.log("done");
}
const run = new Subject<IdTask>();
const effect$: Observable<IdResponse> = run.pipe(
// concatMap only allows one observable at a time to run
concatMap((eTask: IdTask) => from(task(eTask.reason)).pipe(
map((response:any) => ({
taskId: eTask.taskId,
response
})as IdResponse)
)),
share()
);
const effectSub = effect$.subscribe({
next: v => console.log("This is a shared task emission: ", v)
});
interval(5000).subscribe(num =>
run.next({
taskId: num,
reason: "ticker"
})
);
// call task immediately
app.get("/task", async (req, res) => {
const randomId = Math.random();
effect$.pipe(
filter(({taskId}) => taskId == randomId),
take(1)
).subscribe(_ =>
res.send("ok")
);
run.next({
taskId: randomId,
reason: "trigger"
});
});https://stackoverflow.com/questions/69488943
复制相似问题