我想更多地了解RxJS的概念。这就是我目前正在努力解决的问题。我希望将访问抽象为具有异步调用的DB,并且希望同步这些访问。
我能有一系列的动作吗,
对db执行异步调用的
示例:
类的用户调用Action1:读取数据库项,计算下一个状态(例如增量字段),写入数据库
然后..。
类的用户调用下一步操作(Action2),但Action1仍在进行中。
Action2:读DB (动作1:写完成前不能启动)
如何使用RxJS+Typescript做到这一点呢?
弗兰克
/
同时,我有以下代码:
import * as Rx from 'rxjs';
var actionQueue = new Rx.Subject< () => Rx.Observable<any>>();
actionQueue
.concatMap( v => v() )
.subscribe( v => {});
// example action with result type number
function action1 ( v : number ) : Rx.Observable<number> {
console.log( ':: action1: ', v );
var res = new Rx.Subject<number>();
actionQueue.next( () => {
console.log( '>> action1: ', v );
setTimeout( ()=>{
console.log( '<< action1: ', v );
res.next(v);
res.complete();
}, 500 );
return res;
});
return res;
}
// some actions enqueue now, after 700+2500ms
action1( 11 ).subscribe( v => console.log( 'XX action1: ', v ));
action1( 22 ).subscribe( v => console.log( 'XX action1: ', v ));
action1( 33 ).subscribe( v => console.log( 'XX action1: ', v ));
setTimeout( ()=>{
action1( 44 ).subscribe( v => console.log( 'XX action1: ', v ));
}, 700 );
setTimeout( ()=>{
action1( 55 ).subscribe( v => console.log( 'XX action1: ', v ));
}, 2500 );输出显示它是按顺序执行的。
作为typescript/js noob...这段代码有缺陷吗?有没有更优雅的方式?
弗兰克
发布于 2017-02-15 09:03:54
使用delayWhen()运算符怎么样?
// Observable wrapping action1.
const obsAction1 = Observable.create(observer => {
// Read DB item
// Calculate next state
// Write to DB
// Then:
observer.complete();
});
// Private observable wrapping action2.
// DO NOT subscribe to it directly.
const _obsAction2 = Observable.create(observer => {
// Read DB
// Then:
observer.complete();
});
// Public observable wrapping action2 AND delayed by action 1.
// This is what the client code should subscribe to.
const obsAction2 = _obsAction2.delayWhen(obsAction1);现在使用可观察对象的代码如下:
obsAction1.subscribe(val => console.log(val));
// Values will be received only when `obsAction1` emits or completes.
obsAction2.subscribe(val => console.log(val));https://stackoverflow.com/questions/42214772
复制相似问题