我对rxjava有一个非常具体的问题或误解,希望有人能帮上忙。
我正在运行rxjava 2.1.5,并有以下代码片段:
public static void main(String[] args) {
final Observable<Object> observable = Observable.create(emitter -> {
// Code ...
});
observable.subscribeOn(Schedulers.io())
.retryWhen(error -> {
System.out.println("retryWhen");
return error.retry();
}).subscribe(next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"));
}执行此操作后,程序将打印:
retryWhen
Process finished with exit code 0我的问题是,我不明白的是:为什么retryWhen在订阅可观察到的内容时立即被调用?可观察的人什么也不做。
我想要的是当在发射器上调用retryWhen时,onError被调用。我是不是误解了rx的工作方式?
谢谢!
添加新片段:
public static void main(String[] args) throws InterruptedException {
final Observable<Object> observable = Observable.create(emitter -> {
emitter.onNext("next");
emitter.onComplete();
});
final CountDownLatch latch = new CountDownLatch(1);
observable.subscribeOn(Schedulers.io())
.doOnError(error -> System.out.println("doOnError: " + error.getMessage()))
.retryWhen(error -> {
System.out.println("retryWhen: " + error.toString());
return error.retry();
}).subscribe(next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"),
() -> latch.countDown());
latch.await();
}发射器onNext和完成称为。DoOnError从未被调用过。产出如下:
io.reactivex.subjects.SerializedSubject@35fb3008 subscribeNext :retryWhen
进程已完成,退出代码为0
发布于 2017-10-25 08:33:10
当一个retryWhen订阅它时,Observer会调用它提供的函数,这样你就有了一个主序列和一个发射主序列失败的Throwable的序列。您应该将一个逻辑组合到在这个Observable中得到的Function上,因此在最后,一个Throwable将在另一端产生一个值。
Observable.error(new IOException())
.retryWhen(e -> {
System.out.println("Setting up retryWhen");
int[] count = { 0 };
return e
.takeWhile(v -> ++count[0] < 3)
.doOnNext(v -> { System.out.println("Retrying"); });
})
.subscribe(System.out::println, Throwable::printStackTrace);因为e -> { }函数体是针对每个单独的订阅者执行的,所以您可以有一个每个订阅服务器状态,例如安全地重试计数器。
使用e -> e.retry()没有任何效果,因为输入错误流从未得到它的onError调用。
发布于 2017-10-25 07:46:53
一个问题是,您没有收到任何结果,因为您正在创建一个线程使用retryWhen(),但您的应用程序似乎已经完成。要查看这种行为,您可能需要一个while循环来保持应用程序的运行。
这实际上意味着您需要将类似的内容添加到代码的末尾:
while (true) {}另一个问题是,您不会在样本中发出任何错误。您至少需要发出一个值来调用onNext(),否则它不会重复,因为它正在等待它。
这是一个工作的例子,它的值,然后它发出一个错误和重复。您可以使用
.retryWhen(errors -> errors)这和
.retryWhen(errors -> errors.retry())工作样本:
public static void main(String[] args) {
Observable
.create(e -> {
e.onNext("test");
e.onError(new Throwable("test"));
})
.retryWhen(errors -> errors.retry())
.subscribeOn(Schedulers.io())
.subscribe(
next -> System.out.println("subscribeNext"),
error -> System.out.println("subscribeError"),
() -> System.out.println("onCompleted")
);
while (true) {
}
}您需要发出一个结果的原因是,可观察到的需要发出一个值,否则它会等到收到一个新的值。
这是因为onError只能被称为onec (在订阅中),但是onNext发出1..*值。
您可以通过使用doOnError()来检查这种行为,每次错误重试可观察到时,它都会提供错误。
Observable
.create(e -> e.onError(new Exception("empty")))
.doOnError(e -> System.out.println("error received " + e))
.retryWhen(errors -> errors.retry())
.subscribeOn(Schedulers.io())
.subscribe(
nextOrSuccess -> System.out.println("nextOrSuccess " + nextOrSuccess),
error -> System.out.println("subscribeError")
);https://stackoverflow.com/questions/46926208
复制相似问题