好的,我想避免使用可观察的递归,使用外部和内部事件的组合,而不是回忆相同的方法/函数。
现在我有一个:
Queue.prototype.deq = function (opts) {
opts = opts || {};
const noOfLines = opts.noOfLines || opts.count || 1;
const isConnect = opts.isConnect !== false;
let $dequeue = this.init()
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
if(obj.error){
// if there is an error acquiring the lock we
// retry after 100 ms, which means using recursion
// because we call "this.deq()" again
return Rx.Observable.timer(100)
.flatMap(() => this.deq(opts));
}
else{
return makeGenericObservable()
.map(() => obj);
}
})
})
.flatMap(obj => {
return removeOneLine(this)
.map(l => ({l: l, id: obj.id}))
})
.flatMap(obj => {
return releaseLock(this, obj.id)
.map(() => obj.l)
})
.catch(e => {
console.error(e.stack || e);
return releaseLock(this);
});
if (isConnect) {
$dequeue = $dequeue.publish();
$dequeue.connect();
}
return $dequeue;
};与上面使用递归(希望是正确的)不同,我想使用一种更公平的方法。如果有一个从acquireLock函数传回的错误,我想每100‘s重试一次,一旦它成功了,我就不知道该如何做,而且我很难测试it....is,对吧?
Queue.prototype.deq = function (opts) {
// ....
let $dequeue = this.init()
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
if(obj.error){
return Rx.Observable.interval(100)
.takeUntil(
acquireLock(this)
.filter(obj => !obj.error)
)
}
else{
// this is just an "empty" observable
// which immediately fires onNext()
return makeGenericObservable()
.map(() => obj);
}
})
})
// ...
return $dequeue;
};有什么办法能让这句话更简洁吗?我也只想重试5次左右。我的主要问题是--如何在间隔旁边创建一个计数,以便每100 ms重试一次,直到获得锁或计数达到5为止?
我需要这样的东西:
.takeUntil(this or that)也许我可以简单地连锁takeUntils,如下所示:
return Rx.Observable.interval(100)
.takeUntil(
acquireLock(this)
.filter(obj => !obj.error)
)
.takeUntil(++count < 5);我可以这么做:
return Rx.Observable.interval(100)
.takeUntil(
acquireLock(this)
.filter(obj => !obj.error)
)
.takeUntil( Rx.Observable.timer(500));但可能是在寻找一些不那么疯狂的东西
但是我不知道在哪里存储/跟踪count变量.
此外,还希望使它更简洁,并可能检查它的正确性。
我不得不说,如果这个东西有效的话,它是非常强大的编码结构。
发布于 2016-12-30 08:13:09
有两个操作符可以帮助您:重试和retryWhen。两个都在源上重新订阅,可以观察到,因此重试失败的操作。
在这个示例中,我们可以观察到在第一次count订阅上失败的情况:
let getObs = (count) => {
return Rx.Observable.create((subs) => {
console.log('Subscription count = ', count);
if(count) {
count--;
subs.error("ERROR");
} else {
subs.next("SUCCESS");
subs.complete();
}
return () => {};
});
};
getObs(2).subscribe(console.log, console.log);
getObs(2).retry(2).subscribe(console.log, console.log);
getObs(3).retry(2).subscribe(console.log, console.log);<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
如你所见:
retry,我们可以,嗯,重试几次,пуе成功响应retry将放弃并沿链传播错误。您实际上需要的是retryWhen,因为retry试图立即再次执行操作。
let getObs = (count) => {
return Rx.Observable.create((subs) => {
if(count) {
count--;
subs.error("ERROR");
} else {
subs.next("SUCCESS");
subs.complete();
}
return () => {};
});
};
getObs(2).retryWhen(errors => errors.delay(100))
.subscribe(console.log, console.log);
getObs(4).retryWhen(errors => errors.delay(100))
.subscribe(console.log, console.log);<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
在retryWhen中添加延迟很容易,但在多次尝试之后强迫它失败是很困难的:
let getObs = (count) => {
return Rx.Observable.create((subs) => {
if(count) {
count--;
subs.error("ERROR");
} else {
subs.next("SUCCESS");
subs.complete();
}
return () => {};
});
};
getObs(2)
.retryWhen(errors => {
return errors.delay(100).scan((errorCount, err) => {
if(!errorCount) {
throw err;
}
return --errorCount;
}, 2);
})
.subscribe(console.log, console.log);
getObs(4)
.retryWhen(errors => {
return errors.delay(100).scan((errorCount, err) => {
if(!errorCount) {
throw err;
}
return --errorCount;
}, 2);
})
.subscribe(console.log, console.log);<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
最后,两次重试都会引发错误,因此在获取锁时需要这样做:
.flatMap(() => {
return acquireLock(this)
.switchMap(obj => {
if(obj.error) {
return Rx.Observable.throw(obj.error);
} else {
Rx.Observable.of(obj);
}
})
.retryWhen(...)
})https://stackoverflow.com/questions/41387633
复制相似问题