在rx-java2中有Maybe.repeat()/Flowable.repeat()运算符。但是如果可能/Flowable为空,它只会无限期地重新订阅它,并且既不会发出值,也不会发出完成状态。如果上一次订阅确实发出了某些内容,我如何才能重新订阅?
在项目处理机中,可以使用repeatWhen()运算符:
someMono.repeatWhen { it.takeWhile { i -> i > 0 } }但是在rx-java2中,这个操作符不是这样工作的。所以目前我会在可能为空的情况下发出一个错误,然后从这个错误中恢复:
someMaybe
.switchIfEmpty(Maybe.error(MyStopException()))
.repeat()
.onErrorResumeNext { th: Throwable ->
if (th is MyStopException)
Flowable.empty()
else
Flowable.error(th)
}有没有更好/更自然的方法来做这件事?
发布于 2019-08-25 05:59:34
对于Maybe,您可以这样做:
someMaybe
.flatMapPublisher(value ->
Flowable.just(value).repeat()
)
.subscribe(...);这样,如果someMaybe为空,则不会调用Flowable.just(value).repeat()。
对于可流动的情况,您可以这样做:
AtomicBoolean empty = new AtomicBoolean(true);
someFlowable
.doOnNext(value -> empty.set(false))
.repeatWhen(completed ->
completed.takeUntil(__ -> { return empty.get(); })
)
.subscribe(...);可能还有更好的方法,但我想不出任何其他不涉及实现自定义运算符的方法。
https://stackoverflow.com/questions/56368361
复制相似问题