首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rx Java :未订阅replay() autoconnect()

Rx Java :未订阅replay() autoconnect()
EN

Stack Overflow用户
提问于 2018-06-15 06:03:51
回答 2查看 597关注 0票数 0

我是Rx-java的新手,我正在尝试理解replay() autoConnect()是如何工作的。我的代码中有以下情况,我想让代价高昂的操作更有效率。

代码语言:javascript
复制
public static void main(String[] args) {
    Single<String> observable = getStringObservable().observeOn(Schedulers.io()).toSingle();
    Single<String> observable1 = getStringObservable().observeOn(Schedulers.io()).toSingle();
    String observable2 = getStringObservable().toSingle().toBlocking().value();

    observable.subscribe(s -> System.out.println("Sub1 got: " + s));
    observable1.subscribe(s -> System.out.println("Sub2 got: " + s));
    System.out.println("Sub3 got " + observable2);
}

//This is some expensive operation which is a network call
private static Observable<String> getStringObservable() {
    return Single.just("Event")
                .map(s -> {
                    System.out.println("Expensive operation for " + s);
                    return s;
                }).toObservable().replay().autoConnect();
}

代码的输出是-

代码语言:javascript
复制
Expensive operation for Event
Expensive operation for Event
Sub1 got: Event
Expensive operation for Event
Sub3 got Event
Sub2 got: Event

我想要更有效率的是-

代码语言:javascript
复制
Expensive operation for Event
Sub1 got: Event
Sub2 got Event
Sub3 got: Event

我对replay autoConnect的理解是,它会将响应保存给订阅者。我尝试了autoConnect(3),因为我有三个订阅者,但它似乎不起作用。

一些假设-上面的代码是我的代码的伪代码。在我的实际代码中,这三个observable都位于不同的工作流中,并且可以按任何顺序调用,如observable >>、observable1、>>、observable2 OR observable2、>>、observable、>>、observable1等等。

我不能在我的代码中存储任何可以共享的可观察对象的状态。

如果你需要更多的信息,请告诉我。

感谢您的阅读。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-06-15 15:20:30

您可以缓存操作,但必须保留对可观察对象的引用,如下所示:

代码语言:javascript
复制
ConnectableObservable<String> cachedObservable = Observable.just("Event")
        .map(s -> {
            System.out.println("Expensive operation for " + s);
            return s;
        }).replay();
cachedObservable.connect();

Single<String> observable = cachedObservable.observeOn(Schedulers.io()).singleOrError();
Single<String> observable1 = cachedObservable.observeOn(Schedulers.io()).singleOrError();
String observable2 = cachedObservable.singleOrError().blockingGet();

observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable1.subscribe(s -> System.out.println("Sub2 got: " + s));
System.out.println("Sub3 got " + observable2);

打印

代码语言:javascript
复制
Expensive operation for Event
Sub1 got: Event
Sub2 got: Event
Sub3 got Event

如果您确实希望将其作为网络操作的缓存,则可以使用Guava's memoize缓存可观察对象一段时间:

代码语言:javascript
复制
Supplier<ConnectableObservable<String>> cache = Suppliers.memoizeWithExpiration(() -> {
    ConnectableObservable<String> cached = Observable.just("Event")
            .map(s -> {
                System.out.println("Expensive operation for " + s);
                return s;
            }).replay();
    cached.connect();
    return cached;
}, 1, TimeUnit.MINUTES);

cache.get().subscribe(thing -> doAwesomeStuffWith(thing));
票数 3
EN

Stack Overflow用户

发布于 2018-06-15 06:29:47

您需要3个订阅者,但您只创建了一些流,然后阻塞了第一个消费者,这将阻止其他2个订阅者订阅。先订阅非阻塞的,再订阅阻塞的:

代码语言:javascript
复制
Single<String> observable = getStringObservable().observeOn(Schedulers.io()).toSingle();
Single<String> observable1 = getStringObservable().observeOn(Schedulers.io()).toSingle();

observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable1.subscribe(s -> System.out.println("Sub2 got: " + s));

String observable2 = getStringObservable().toSingle().toBlocking().value();

System.out.println("Sub3 got " + observable2);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50866566

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档