有时,我不得不在我的观察范围内做一些清理工作(例如关闭打开的文件),我想知道什么是最好的方法。到目前为止,我已经看到了两个,但我很难理解它们的不同之处:你能解释一下它们的不同之处吗?如果有更好的方法来实现这一点?
1)
// MyObject will take care of calling onNext(), onError() and onCompleted()
// on the subscriber.
final MyObject o = new MyObject();
Observable obs = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
o.setSubscriber(subscriber);
// This will tell MyObject to start allocating resources and do its job.
o.start();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
// This will tell MyObject to finish its job and deallocate any resources.
o.stop();
}
});2)
Observable obs = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
// MyObject will take care of calling onNext(), onError() and onCompleted()
// on the subscriber.
final MyObject o = new MyObject(subscriber);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// This will tell MyObject to finish its job and deallocate any resources.
o.stop();
}
}));
// This will tell MyObject to start allocating resources and do its job.
o.start();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});发布于 2016-11-02 18:20:17
要回答您最初的问题,doOnUnSubscribe和向Subscriber添加一个Subscription都是一样的。实际上,当您调用doOnUnSubscribe时,它只是将Action作为Subscription添加到Subscriber中。因此,doOnUnSubscribe在后台使用您的第二个示例。
doOnUnSubscribe代码:
public class OperatorDoOnUnsubscribe<T> implements Operator<T, T> {
private final Action0 unsubscribe;
/**
* Constructs an instance of the operator with the callback that gets invoked when the modified Observable is unsubscribed
* @param unsubscribe The action that gets invoked when the modified {@link rx.Observable} is unsubscribed
*/
public OperatorDoOnUnsubscribe(Action0 unsubscribe) {
this.unsubscribe = unsubscribe;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
child.add(Subscriptions.create(unsubscribe));
// Pass through since this operator is for notification only, there is
// no change to the stream whatsoever.
return Subscribers.wrap(child);
}
}发布于 2016-11-03 16:32:51
您提到的两种解决方案中哪一种使用的决定取决于您试图使用/close/dispose的资源是否在多个订阅之间共享。
- This is the case in your example of `MyObject`. This has the benefit that the resource will not be exposed outside the `Observable.create()` method and thus make the resource free from accidental side-effects.
- As an example, if you want to have a counter of how many times the Observable was used, you can have a shared counter and keep on incrementing in the `doOnUnsubscribe` or `doOnSubscribe`.
- Another example can be if you want to have a counter of how many connections are currently open to the resource, you can use increment and decrement combination in `doOnSubscribe` and `doOnUnsubscribe` correspondingly to achieve that.
同样在您的示例中,您可以使用MyObject方法代替Observable.using()方法来实现同样的目的,而不是创建管理资源打开和关闭以及生成事件的Observable.using()抽象。它包含三个论点:
resourceFactory,它将打开资源,observableFactory,它将生成事件和disposeAction,它将关闭资源。发布于 2016-11-02 18:13:52
如果您没有在Observable.create的下游放置任何值,那么使用subscriber.onNext有什么意义呢?
第一个是一个巨大的不-不,因为你正在对一个已经关闭的对象做副作用。如果您同时订阅来自两个不同线程的可观察到的创建,会发生什么情况?
第二个看起来更好,因为您添加了subscriber.add,如果已经释放了订阅,它将调用o.stop()。唯一缺少的是onNext的调用,价值沿着下游移动。
有一个从资源创建可观测值的操作符,称为“使用”。请看一下http://reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,%20rx.functions.Action1)
https://stackoverflow.com/questions/40384855
复制相似问题