在我的Android应用程序中,我使用的是域级存储接口,它的后台是使用SqlBrite实现的本地DB和带有Retrofit可观测数据的network。因此,我在仓库中有方法getDomains(): Observable<List<Domain>>,在Retrofit和SqlBrite中有两个相应的方法。我不想把这两个可观察到的东西连在一起或合并。我希望我的存储库只从SqlBrite获取数据,而且由于SqlBrite返回QueryObservable,这会在底层数据每次更改时触发onNext(),因此我可以独立运行网络请求并将结果存储到SqlBrite,并通过从网络获取并存储到DB数据来更新可观察到的结果。因此,我尝试实现我的仓库的getDomains()方法,如下所示:
fun getDomains(): Observable<List<Domain>> {
return db.getDomains()
.doOnSubscribe {
networkClient.getDomains()
.doOnNext { db.putDomains(it) }
.onErrorReturn{ emptyList() }
.subscribe()
}
}但是在这种情况下,每次客户端应该订阅,每次它会发出网络请求,这都不是很好。我考虑过其他do...操作符将请求移到那里,但是doOnCompleted()在QueryObservable的情况下永远不会被调用,除非我在某个地方调用toBlocking() (我不会调用),doOnEach()也不是很好,因为每次从db提取项时,doOnEach()都会发出请求。我也尝试使用replay()操作符,但是虽然在本例中缓存了可观察到的缓存,但订阅会发生并导致网络请求。那么,如何才能将这两个可观测到的数据以理想的方式结合起来呢?
发布于 2016-06-27 07:48:51
好的,这取决于您所拥有的具体用例:即假设您希望显示来自本地数据库的最新数据,并通过在后台执行网络请求来不时更新数据库。
也许有更好的方法,但也许你可以这样做
fun <T> createDataAwareObservable(databaseQuery: Observable<T>): Observable<T> =
stateDeterminer.getState().flatMap {
when (it) {
State.UP_TO_DATE -> databaseQuery // Nothing to do, data is up to date so observable can be returned directly
State.NO_DATA ->
networkClient.getDomains() // no data so first do the network call
.flatMap { db.save(it) } // save network call result in database
.flatMap { databaseQuery } // continue with original observable
State.SYNC_IN_BACKGROUND -> {
// Execute sync in background
networkClient.getDomains()
.flatMap { db.save(it) }
.observeOn(backgroundSyncScheduler)
.subscribeOn(backgroundSyncScheduler)
.subscribe({}, { Timber.e(it, "Error when starting background sync") }, {})
// Continue with original observable in parallel, network call will then update database and thanks to sqlbrite databaseQuery will be update automatically
databaseQuery
}
}
}因此,在最后创建SQLBrite可观察性(QueryObservable)并将其传递给createDataAwareObservable()函数。如果这里没有数据,它将确保从网络加载数据,否则它将检查数据是否应该在后台更新(将数据保存到数据库中,然后数据库将自动更新SQLBrite QueryObservable ),或者数据是否是最新的。
基本上你可以这样使用它:
createDataAwareObservable( db.getAllDomains() ).subscribe(...)因此,对于这个createDataAwareObservable()的用户来说,当您作为参数传入时,总是会得到相同类型的Observable<T>。所以从本质上说,你似乎一直在订阅db.getAllDomains() .
发布于 2016-06-26 18:05:32
如果您的问题是每次您想要获取数据时都必须订阅您的观察者,那么您可以使用中继,因为没有实现onComplete,所以它永远不会取消对观察者的订阅。
/**
* Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
* It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
* and all the next items passed to the pipeline.
*/
@Test
public void testRelay() throws InterruptedException {
BehaviorRelay<String> relay = BehaviorRelay.create("default");
relay.subscribe(result -> System.out.println("Observer1:" + result));
relay.call("1");
relay.call("2");
relay.call("3");
relay.subscribe(result -> System.out.println("Observer2:" + result));
relay.call("4");
relay.call("5");
}这里的另一个例子是https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java
https://stackoverflow.com/questions/38041097
复制相似问题