我是Rx-java的新手,我正在尝试理解replay() autoConnect()是如何工作的。我的代码中有以下情况,我想让代价高昂的操作更有效率。
public static void main(String[] args) {
Single<String> observable = getStringObservable().observeOn(Schedulers.io()).toSingle();
Single<String> observable1 = getStringObservable().observeOn(Schedulers.io()).toSingle();
String observable2 = getStringObservable().toSingle().toBlocking().value();
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable1.subscribe(s -> System.out.println("Sub2 got: " + s));
System.out.println("Sub3 got " + observable2);
}
//This is some expensive operation which is a network call
private static Observable<String> getStringObservable() {
return Single.just("Event")
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
}).toObservable().replay().autoConnect();
}代码的输出是-
Expensive operation for Event
Expensive operation for Event
Sub1 got: Event
Expensive operation for Event
Sub3 got Event
Sub2 got: Event我想要更有效率的是-
Expensive operation for Event
Sub1 got: Event
Sub2 got Event
Sub3 got: Event我对replay autoConnect的理解是,它会将响应保存给订阅者。我尝试了autoConnect(3),因为我有三个订阅者,但它似乎不起作用。
一些假设-上面的代码是我的代码的伪代码。在我的实际代码中,这三个observable都位于不同的工作流中,并且可以按任何顺序调用,如observable >>、observable1、>>、observable2 OR observable2、>>、observable、>>、observable1等等。
我不能在我的代码中存储任何可以共享的可观察对象的状态。
如果你需要更多的信息,请告诉我。
感谢您的阅读。
发布于 2018-06-15 15:20:30
您可以缓存操作,但必须保留对可观察对象的引用,如下所示:
ConnectableObservable<String> cachedObservable = Observable.just("Event")
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
}).replay();
cachedObservable.connect();
Single<String> observable = cachedObservable.observeOn(Schedulers.io()).singleOrError();
Single<String> observable1 = cachedObservable.observeOn(Schedulers.io()).singleOrError();
String observable2 = cachedObservable.singleOrError().blockingGet();
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable1.subscribe(s -> System.out.println("Sub2 got: " + s));
System.out.println("Sub3 got " + observable2);打印
Expensive operation for Event
Sub1 got: Event
Sub2 got: Event
Sub3 got Event如果您确实希望将其作为网络操作的缓存,则可以使用Guava's memoize缓存可观察对象一段时间:
Supplier<ConnectableObservable<String>> cache = Suppliers.memoizeWithExpiration(() -> {
ConnectableObservable<String> cached = Observable.just("Event")
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
}).replay();
cached.connect();
return cached;
}, 1, TimeUnit.MINUTES);
cache.get().subscribe(thing -> doAwesomeStuffWith(thing));发布于 2018-06-15 06:29:47
您需要3个订阅者,但您只创建了一些流,然后阻塞了第一个消费者,这将阻止其他2个订阅者订阅。先订阅非阻塞的,再订阅阻塞的:
Single<String> observable = getStringObservable().observeOn(Schedulers.io()).toSingle();
Single<String> observable1 = getStringObservable().observeOn(Schedulers.io()).toSingle();
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable1.subscribe(s -> System.out.println("Sub2 got: " + s));
String observable2 = getStringObservable().toSingle().toBlocking().value();
System.out.println("Sub3 got " + observable2);https://stackoverflow.com/questions/50866566
复制相似问题