我有以下rxJava链:
override fun combineLocationToPlace(req: Flowable<Place>): Flowable<Place> {
var combinedFlowable = Flowable
.combineLatest(
req,
getLastLocation().lastOrError().toFlowable(),
BiFunction<Place, Location, Place> { t1, location ->
Timber.w("FIRSTINIT - Retrieved location $location")
var placeLocation = Location(t1.placeName)
placeLocation.latitude = t1.latitude
placeLocation.longitude = t1.longitude
t1.distance = location.distanceTo(placeLocation)
t1
})
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
return combinedFlowable
}简而言之,我正在从本地数据库(一个地点)检索一些数据,并希望将其与来自GPS的最新位置相结合:
override fun getLastLocation(requestIfEmpty: Boolean): Observable<Location> {
var lastLocation = locationProvider.lastKnownLocation
.doOnNext {
Timber.w("Got location $it from last one")
}
.doOnComplete {
Timber.w("did i get a location?")
}
if (requestIfEmpty) {
Timber.w("Switching to request of location")
lastLocation = lastLocation.switchIfEmpty(requestLocation())
}
return lastLocation.doOnNext {
Timber.w("Got something!")
location = it
}
}但我想说明用户没有最后一个位置的scneario,因此这一行:
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}它试图捕获任何错误,并仅使用原始请求重试,而不将其与任何内容相结合。我这样调用这段代码:
fun getPlace(placeId: String) {
locationManager.combineLocationToPlace(placesRepository.getPlace(placeId))
.onErrorResumeNext { t: Throwable ->
Timber.e(t, "Error resuming next! ")
placesRepository.getPlace(placeId)
}.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
.subscribeBy(
onNext = {
place.value = Result.success(it)
},
onError = {
Timber.e("ERROR! $it")
place.value = Result.failure(it)
}
)
.addTo(disposables)
}但是,当没有位置抛出NoSuchElementException时,我的flowable会切换到原始请求,然后在执行它时会得到一个NetworkOnMainThread异常。这个请求不应该在我放在里面的scheduler.io()上执行吗(因为我把代码放在前面了)?
如果你想知道,schedulerProvider.io()翻译成:
Schedulers.io()GetPlace:
/**
* Retrieves a single place from database
*/
override fun getPlace(id: String): Flowable<Place> {
return Flowable.merge(placesDao.getPlace(id),
refreshPlace(id).toFlowable())
}
/**
* Triggers a refreshPlace update on the db, useful when changing stuff associated with the place
* itself indirectly (e.g. an experience)
*/
private fun refreshPlace(id: String): Single<Place> {
return from(placesApi.getPlace(id))
.doOnSuccess {
placesDao.savePlace(it)
}
}发布于 2018-07-25 21:41:45
为了确保您的网络发生在主线程之外,请显式地将其发送到后台线程。
使用Rx调度程序类中的IO、New Thread或Computation调度程序:
subscribeOn(Schedulers.computation())对于您不想这样做的情况(或者如果您认为它应该已经在后台线程上并且只想调试),您可以按如下方式记录线程信息:
Thread.currentThread().getName()如果您对观察和订阅使用不同的调度程序,则这对于跟踪使用Schedulers.trampoline()或时发生的情况特别有用:
.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())发布于 2018-07-25 22:10:14
你不能使用onErrorResumeNext来实现这种功能。你应该使用retryWhen运算符。
也许这篇文章对你很有用。
此代码使用backoff轮询服务器,直到收到与204不同的代码。也许您可以使用retryWhen而不是repeatWhen来适应您的需求。
fun pollServerWithBackoff(videoId: String, maxAttempts: Int, delay: Int): Flowable<Response<ResponseBody>> {
return api.download(videoId)
.subscribeOn(Schedulers.io())
.repeatWhen {
it
.zipWith(Flowable.range(1, maxAttempts),
BiFunction { _: Any?, attempt: Int -> attempt })
.flatMap {
Flowable.timer((it * delay).toLong(), TimeUnit.SECONDS);
}
}
.takeUntil({
it.code() != 204
})
.filter {
it.code() != 204
}
.map{
if(it.code() in 200..300)
it
else
throw IOException(it.errorBody()?.toString() ?: "Unkown Error")
}
}https://stackoverflow.com/questions/51421834
复制相似问题