我有一个外部和内部的Observable。内部Obersvable有时会出现错误,这些错误可以通过重试来处理。
function sm$(val) {
if (Math.random() > .4) {
return Rx.Observable.throw(val)
} else {
return Rx.Observable.of(val)
}
}
function sm(val) {
return Rx.Observable.of(val)
.switchMap(sm$)
.catch(() => Rx.Observable.of(val).delay(1000).switchMap(sm))
}
Rx.Observable
.interval(500)
.switchMap(sm)
.take(5)
.subscribe(
val => console.log("val:", val),
err => console.log("err:", err),
() => console.log("complete")
)<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.min.js"></script>
这一日志即:
val: 1
val: 2
val: 6
val: 10
val: 11
complete当我返回一个在catch块中可观察到的延迟值时,它不能正常工作--一些值只是被跳过。没有延迟,它能像预期的那样工作,但这不是我想要的。我想推迟重复的执行。
function sm$(val) {
if (Math.random() > .4) {
return Rx.Observable.throw(val)
} else {
return Rx.Observable.of(val)
}
}
function sm(val) {
return Rx.Observable.of(val)
.switchMap(sm$)
.catch(() => Rx.Observable.of(val).switchMap(sm))
// removed the delay(1000)
}
Rx.Observable
.interval(500)
.switchMap(sm)
.take(5)
.subscribe(
val => console.log("val:", val),
err => console.log("err:", err),
() => console.log("complete")
)<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.min.js"></script>
这个日志看起来总是这样,这就是我所期望的:
val: 0
val: 1
val: 2
val: 3
val: 4
complete发布于 2018-04-11 13:41:53
问题是您使用的是Observable.interval(500).switchMap(sm)。
特别是在您的情况下的switchMap。即使在内部可观察到的地方使用delay(1000),源switchMap也会继续发出,这仅仅是因为它在1000 use后发出时已经取消了订阅。
因此,看起来您可以使用concatMap而不是外部switchMap。
function sm$(val) {
if (Math.random() > .4) {
return Rx.Observable.throw(val)
} else {
return Rx.Observable.of(val)
}
}
function sm(val) {
return Rx.Observable.of(val)
.switchMap(sm$)
.catch(() => Rx.Observable.of(val).delay(1000).switchMap(sm))
}
Rx.Observable
.interval(500)
.concatMap(sm)
.take(5)
.subscribe(
val => console.log("val:", val),
err => console.log("err:", err),
() => console.log("complete")
)<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.min.js"></script>
发布于 2018-04-11 13:42:00
我认为问题与你使用1000毫秒的延迟有关,同时你在发出5个事件后完成流,每个事件每500毫秒发生一次。
这意味着延迟发射,由于catch块延迟,可以在流完成后到达。
如果你减少延误,你可能会得到你想要的东西。我还添加了一些日志以获得更好的图片。
import {Observable} from 'rxjs';
function sm$(val) {
if (Math.random() > .4) {
return Observable.throw(val)
} else {
return Observable.of(val)
}
}
function sm(val) {
return Observable.of(val)
.switchMap(sm$)
.catch(() => {
console.log('error', val);
return Observable.of(val).delay(10).switchMap(d => sm(d));
})
}
Observable
.interval(500)
.switchMap(sm)
.take(5)
.subscribe(
val => console.log("val:", val),
err => console.log("err:", err),
() => console.log("complete")
)https://stackoverflow.com/questions/49775001
复制相似问题