首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJava Observable.doOnUnsubscribe()与Subscriber.add()

RxJava Observable.doOnUnsubscribe()与Subscriber.add()
EN

Stack Overflow用户
提问于 2016-11-02 16:21:07
回答 4查看 694关注 0票数 1

有时,我不得不在我的观察范围内做一些清理工作(例如关闭打开的文件),我想知道什么是最好的方法。到目前为止,我已经看到了两个,但我很难理解它们的不同之处:你能解释一下它们的不同之处吗?如果有更好的方法来实现这一点?

1)

代码语言:javascript
复制
    // MyObject will take care of calling onNext(), onError() and onCompleted()
    // on the subscriber.
    final MyObject o = new MyObject();

    Observable obs = Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            try {
                if (!subscriber.isUnsubscribed()) {

                    o.setSubscriber(subscriber);

                    // This will tell MyObject to start allocating resources and do its job.
                    o.start();

                }
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    }).doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            // This will tell MyObject to finish its job and deallocate any resources.
            o.stop();
        }
    });

2)

代码语言:javascript
复制
    Observable obs = Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            try {
                if (!subscriber.isUnsubscribed()) {

                    // MyObject will take care of calling onNext(), onError() and onCompleted()
                    // on the subscriber.
                    final MyObject o = new MyObject(subscriber);

                    subscriber.add(Subscriptions.create(new Action0() {
                        @Override
                        public void call() {
                            // This will tell MyObject to finish its job and deallocate any resources.
                            o.stop();
                        }
                    }));

                    // This will tell MyObject to start allocating resources and do its job.
                    o.start();

                }
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    });
EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2016-11-02 18:20:17

要回答您最初的问题,doOnUnSubscribe和向Subscriber添加一个Subscription都是一样的。实际上,当您调用doOnUnSubscribe时,它只是将Action作为Subscription添加到Subscriber中。因此,doOnUnSubscribe在后台使用您的第二个示例。

doOnUnSubscribe代码:

代码语言:javascript
复制
public class OperatorDoOnUnsubscribe<T> implements Operator<T, T> {
  private final Action0 unsubscribe;

/**
 * Constructs an instance of the operator with the callback that gets invoked when the modified Observable is unsubscribed
 * @param unsubscribe The action that gets invoked when the modified {@link rx.Observable} is unsubscribed
 */
public OperatorDoOnUnsubscribe(Action0 unsubscribe) {
    this.unsubscribe = unsubscribe;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
    child.add(Subscriptions.create(unsubscribe));

    // Pass through since this operator is for notification only, there is
    // no change to the stream whatsoever.
    return Subscribers.wrap(child);
  }
}
票数 3
EN

Stack Overflow用户

发布于 2016-11-03 16:32:51

您提到的两种解决方案中哪一种使用的决定取决于您试图使用/close/dispose的资源是否在多个订阅之间共享。

  1. 当资源用于生成事件时,请使用subscriber.add(...)。在这种情况下,您将不希望共享资源。
代码语言:javascript
复制
- This is the case in your example of `MyObject`. This has the benefit that the resource will not be exposed outside the `Observable.create()` method and thus make the resource free from accidental side-effects.

  1. 当您必须跨多个订阅共享某些内容时,请使用doOnUnsubscribe
代码语言:javascript
复制
- As an example, if you want to have a counter of how many times the Observable was used, you can have a shared counter and keep on incrementing in the `doOnUnsubscribe` or `doOnSubscribe`. 
- Another example can be if you want to have a counter of how many connections are currently open to the resource, you can use increment and decrement combination in `doOnSubscribe` and `doOnUnsubscribe` correspondingly to achieve that. 

同样在您的示例中,您可以使用MyObject方法代替Observable.using()方法来实现同样的目的,而不是创建管理资源打开和关闭以及生成事件的Observable.using()抽象。它包含三个论点:

  • resourceFactory,它将打开资源,
  • observableFactory,它将生成事件和
  • disposeAction,它将关闭资源。
票数 1
EN

Stack Overflow用户

发布于 2016-11-02 18:13:52

如果您没有在Observable.create的下游放置任何值,那么使用subscriber.onNext有什么意义呢?

第一个是一个巨大的不-不,因为你正在对一个已经关闭的对象做副作用。如果您同时订阅来自两个不同线程的可观察到的创建,会发生什么情况?

第二个看起来更好,因为您添加了subscriber.add,如果已经释放了订阅,它将调用o.stop()。唯一缺少的是onNext的调用,价值沿着下游移动。

有一个从资源创建可观测值的操作符,称为“使用”。请看一下http://reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,%20rx.functions.Action1)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40384855

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档