有没有从io.reactivex.ObservableEmitter<T>转换到io.reactivex.Observer<T>的简单方法?我在rx-java2库中找不到这样做的函数。
这个实现看起来很简单:
public static <T> Observer<T> toObserver(ObservableEmitter<T> oe) {
return new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
oe.setDisposable(d);
}
@Override
public void onNext(T t) {
oe.onNext(t);
}
@Override
public void onError(Throwable e) {
oe.onError(e);
}
@Override
public void onComplete() {
oe.onComplete();
}
};
}但它认为它应该是标准库实现的一部分,因为它在rx-java2中提供了两个核心类型之间的转换。
基本上,我尝试将以下代码从rxjava1迁移到rxjava2
class X<T, O1, O2> implements Transformer<T, Either<O1, O2>> {
Transformer<T, O1> t1;
Transformer<T, O2> t2;
@Override
public Observable<Either<O1, O2>> call(Observable<T> input) {
return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() {
@Override
public Observable<Either<O1, O2>> call(final T t) {
return Observable.<Either<O1, O2>>create(new OnSubscribe<Either<O1, O2>>() {
@Override
public void call(final Subscriber<? super Either<O1, O2>> sub) {
t1.call(Observable.just(t)).map(o1 -> Either.<O1, O2>left(o1)).subscribe(sub);
t2.call(Observable.just(t)).map(o2 -> Either.<O1, O2>right(o2)).subscribe(sub);
}
});
}
});
}}
注意,OnSubscribe提供了Subscriber接口,然后我可以用它来订阅另外两个Observable's,需要进行rxjava2转换。
发布于 2017-08-07 22:48:52
看起来您需要publish(Function):(顺便说一下,您的代码看起来很复杂,并且违反了v1中的协议)。
ObservableTransformer<T, O1> t1 = ...
ObservableTransformer<T, O2> t2 = ...
ObservableTransformer<T, Either<O1, O2>> combiner = o ->
o.publish(g -> Observable.merge(
t1.apply(g).map(o1 -> Either.<O1, O2>left(o1)),
t2.apply(g).map(o2 -> Either.<O1, O2>right(o2))
));如果您真的想坚持使用外部flatMap (以防内部异步),请使用merge()而不是create
return input.flatMap(new Func1<T, Observable<Either<O1, O2>>>() {
@Override
public Observable<Either<O1, O2>> call(final T t) {
Observable<T> just = Observable.just(t);
Observable.merge(
t1.call(just).map(o1 -> Either.<O1, O2>left(o1)),
t2.call(just).map(o2 -> Either.<O1, O2>right(o2))
)
}
});https://stackoverflow.com/questions/45546342
复制相似问题