我将更多地使用RxJava和ReactFX,但我想了解的是如何调和这两者,因为ReactFX没有RxJava依赖关系,那么这两个人怎么能在同一个单子内相互交谈呢?尤其是在JavaFX的ObservableValue、RxJava的Observable和ReactFX的StreamEvent之间架设桥梁时,没有太多的样板。
我想用RxJava构建我的核心业务逻辑,因为他们并不总是支持JavaFX应用程序。但是我希望JavaFX UI使用ReactFX并利用EventStream。因此,我的问题是,将EventStream转换为Observable,将Observable转换为EventStream、Binding或ObservableValue,最有效的方法是什么?我知道我可以全面使用RxJava,但我想利用ReactFX平台的线程安全和方便.
//DESIRE 1- Turn EventStream into Observable in the same monad
Observable<Foo> obs = EventStream.valuesOf(fooObservableValue).toObservable();
//Desire 2- Turn Observable into ObservableValue, Eventstream, or Binding
Binding<Foo> obsVal = Observable.create(...).toBinding();发布于 2015-05-14 17:23:52
将ReactFX EventStream转化为RxJava Observable
Observable<Foo> toRx(EventStream<Foo> es) {
PublishSubject<Foo> sub = PublishSubject.create();
es.subscribe(sub::onNext);
return sub;
}将RxJava Observable转化为ReactFX EventStream
EventStream<Foo> fromRx(Observable<Foo> obs) {
EventSource<Foo> es = new EventSource<>();
obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)));
return es;
}注意后一种情况下的Platform.runLater(...)。这使得生成的EventStream在JavaFX应用程序线程上发出事件。
还要注意,在这两种情况下,我们都忽略了从subscribe方法返回的subscribe。如果要在应用程序的生存期内建立绑定,这是很好的。另一方面,如果它们之间的绑定应该是短暂的,在第一种情况下,您将让您的RxJava组件公开Subject,您的ReactFX组件公开EventStream,然后根据需要执行subscribe/unsubscribe。对于第二种情况也是如此。
发布于 2015-05-14 17:42:28
我不熟悉ReactFX,但是看看我可以推断出这些转换的API:
public static <T> Observable<T> toObservable(EventStream<? extends T> es) {
return Observable.create(child -> {
Subscription s = es.subscribe(child::onNext);
child.add(Subscriptions.create(s::unsubscribe));
});
}
public static <T> EventStream<T> toEventStream(Observable<? extends T> o) {
return new EventStream<T>() {
final Vector<Consumer<? super T>> observers = new Vector<>();
@Override
public void addObserver(Consumer<? super T> observer) {
observers.add(observer);
}
@Override
public void removeObserver(Consumer<? super T> observer) {
observers.remove(observer);
}
@Override
public Subscription subscribe(Consumer<? super T> subscriber) {
addObserver(subscriber);
rx.Subscriber<T> s = new rx.Subscriber<T>() {
@Override
public void onNext(T t) {
subscriber.accept(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
removeObserver(subscriber);
}
@Override
public void onCompleted() {
removeObserver(subscriber);
}
};
o.subscribe(s);
return () -> {
s.unsubscribe();
removeObserver(subscriber);
};
}
};
}尽管ReactFX不支持同步取消订阅,但两者都应该提供取消订阅的功能,而且我也不知道EventStream是否可以作为一个热的或冷的可观察的。我无法访问绑定所以帮不了你。
https://stackoverflow.com/questions/30241292
复制相似问题