我正在用retrials实现DB更新方法。遵循retryWhen()操作符的通用模式,如下所述:Using Rx Java retryWhen() .
..But我的重试逻辑从不执行。我正在调试它,可以看到断点击中了place 3,但它永远不会返回到place 2的重试逻辑。在place 3之后,它总是转到place 4,这是onComplete处理程序。
(代码使用Java8lambdas)
通过完全删除retryWhen()块,现在从订阅的> onError()块递归调用updateWithRetrials(),我应用了一种解决方法。这是可行的,但我不喜欢这种方法。
请有人建议当我使用retryWhen()运算符时什么是不正确的?
private void updateWithRetrials(some input x)
{
AtomicBoolean retryingUpdate = new AtomicBoolean(false);
...
// 1- Start from here
Observable.<JsonDocument> just(x).map(x1 -> {
if (retryingUpdate.get())
{
//2. retry logic
}
//doing sth with x1 here
...
return <some observable>;
})
.retryWhen(attempts -> attempts.flatMap(n -> {
Throwable cause = n.getThrowable();
if (cause instanceof <errors of interest>)
{
// 3 - break-point hits here
// retry update in 1 sec again
retryingUpdate.set(true);
return Observable.timer(1, TimeUnit.SECONDS);
}
// fail in all other cases...
return Observable.error(n.getThrowable());
}))
.subscribe(
doc -> {
//.. update was successful
},
onError -> {
//for unhandled errors in retryWhen() block
},
{
// 4. onComplete block
Sysout("Update() call completed.");
}
); //subscribe ends here
}发布于 2015-03-29 20:23:12
我认为错误发生在map内部,因为它不能发生在just中。这不是retryWhen的工作方式。
使用create实现您的可观察性,并确保在map中不会出现错误。如果在create块中抛出任何错误,则将调用retryWhen,并根据重试逻辑重新尝试工作单元。
Observable.create(subscriber -> {
// code that may throw exceptions
}).map(item -> {
// code that will not throw any exceptions
}).retryWhen(...)
...发布于 2015-03-30 20:21:35
您的问题是由于使用Observable.just()进行了一些性能优化。
此操作符在发送项目后,不检查订阅是否被取消,并在所有情况下发送onComplete。
Observable.retryWhen (和retry)重新订阅错误,但当源发送onComplete时终止。
因此,即使retry操作符重新订阅,它也会从以前的订阅中获取onComplete并停止。
您可能会看到,下面的代码失败了(与您的代码一样):
@Test
public void testJustAndRetry() throws Exception {
AtomicBoolean throwException = new AtomicBoolean(true);
int value = Observable.just(1).map(v->{
if( throwException.compareAndSet(true, false) ){
throw new RuntimeException();
}
return v;
}).retry(1).toBlocking().single();
}但是,如果你“别忘了”去检查订阅,那就有效了!:
@Test
public void testCustomJust() throws Exception {
AtomicBoolean throwException = new AtomicBoolean(true);
int value = Observable.create((Subscriber<? super Integer> s) -> {
s.onNext(1);
if (!s.isUnsubscribed()) {
s.onCompleted();
}
}
).map(v -> {
if (throwException.compareAndSet(true, false)) {
throw new RuntimeException();
}
return v;
}).retry(1).toBlocking().single();
Assert.assertEquals(1, value);
}https://stackoverflow.com/questions/29324886
复制相似问题