我有一个验证过程,它逐行验证表中的数据。因为每一行验证都使用共享资源,所以必须序列化对它的访问。
public validate():Observable<boolean>{
const rowValidations:Observable<boolean>[] = dataRows.map(row=>this.validateSingleRow(row);
return forkJoin(...rowValidations).pipe(
map(results=>results.every(r=>r))
)
}如果我正确理解的话,forkJoin不会像concat那样等待每个可观察到的完成,然后再订阅下一个,这样很可能会失败。另一方面,concat将所有可观察到的数据序列化到一个单一的流中。
如何获得与concat类似的订阅订单,但如何获得每个可观察结果的数组,比如使用forkJoin,有效地同步每个内部可观察到的执行(比如Javas synchronzied validateSingleRow)?
发布于 2021-12-07 11:44:44
实际上,如果您知道每个this.validateSingleRow(row)总是只发出一次,则可以使用toArray()。
concat(...rowValidations).pipe(
toArray(),
);concat将保证正确的顺序,toArray()将收集所有排放到一个数组中,并在源可观测完成后重新发射它。
否则,如果validateSingleRow可能发出多次且始终只希望它的最后一个值,则可以使用scan。
const indexedRowValidations = rowValidations.map((o, index) => o.pipe(
map(result => [index, result]),
));
concat(...indexedRowValidations ).pipe(
scan((acc, [index, result]) => {
acc[index] = result;
return acc;
}, {}),
takeLast(1),
);(我没有测试它,但我相信你有这样的想法)。
发布于 2021-12-07 09:51:56
像这样的事对你有好处吗?
class SomeClass {
dataRows = [1, 2, 3];
public validate(): Observable<boolean[]> {
return this.resolveSequentially(this.dataRows);
}
private validateSequentially<T>([cur, ...obs]: T[]): Observable<boolean[]> {
return cur
? this.validateSingleRow(cur).pipe(
switchMap((x) =>
this.validateSequentially(obs).pipe(map((arr) => [x, ...arr]))
)
)
: of([]);
}
// Mock
private validateSingleRow(cur: any) {
console.log(`Validating ${cur}...`);
return of(Math.floor(Math.random() * 2) === 1).pipe(
delay(1000),
tap((x) => console.log(`Result: ${x}`))
);
}
}
const obj = new SomeClass();
obj.validate().subscribe(console.log);发布于 2021-12-07 11:45:03
满足我要求的解决方案比人们想象的要简单。我在concat中使用过toArray(),像这样
const rowValidations:Observable<boolean>[] = dataRows.map(row=>defer(()=>this.validateSingleRow(row));
return concat(...rowValidations).pipe(
toArray(),
map(results=>results.every(r=>r))
)因此,validateSingleRow逐个执行,toArray将布尔流转换为布尔数组。
https://stackoverflow.com/questions/70256578
复制相似问题