首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >最有效的方法是将可观察的转化为可观察值/绑定/事件流?

最有效的方法是将可观察的转化为可观察值/绑定/事件流?
EN

Stack Overflow用户
提问于 2015-05-14 15:31:54
回答 2查看 853关注 0票数 0

我将更多地使用RxJava和ReactFX,但我想了解的是如何调和这两者,因为ReactFX没有RxJava依赖关系,那么这两个人怎么能在同一个单子内相互交谈呢?尤其是在JavaFX的ObservableValue、RxJava的Observable和ReactFX的StreamEvent之间架设桥梁时,没有太多的样板。

我想用RxJava构建我的核心业务逻辑,因为他们并不总是支持JavaFX应用程序。但是我希望JavaFX UI使用ReactFX并利用EventStream。因此,我的问题是,将EventStream转换为Observable,将Observable转换为EventStreamBindingObservableValue,最有效的方法是什么?我知道我可以全面使用RxJava,但我想利用ReactFX平台的线程安全和方便.

代码语言:javascript
复制
//DESIRE 1- Turn EventStream into Observable in the same monad
Observable<Foo> obs = EventStream.valuesOf(fooObservableValue).toObservable();

//Desire 2- Turn Observable into ObservableValue, Eventstream, or Binding
Binding<Foo> obsVal = Observable.create(...).toBinding();
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-05-14 17:23:52

将ReactFX EventStream转化为RxJava Observable

代码语言:javascript
复制
Observable<Foo> toRx(EventStream<Foo> es) {
    PublishSubject<Foo> sub = PublishSubject.create();
    es.subscribe(sub::onNext);
    return sub;
}

将RxJava Observable转化为ReactFX EventStream

代码语言:javascript
复制
EventStream<Foo> fromRx(Observable<Foo> obs) {
    EventSource<Foo> es = new EventSource<>();
    obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)));
    return es;
}

注意后一种情况下的Platform.runLater(...)。这使得生成的EventStream在JavaFX应用程序线程上发出事件。

还要注意,在这两种情况下,我们都忽略了从subscribe方法返回的subscribe。如果要在应用程序的生存期内建立绑定,这是很好的。另一方面,如果它们之间的绑定应该是短暂的,在第一种情况下,您将让您的RxJava组件公开Subject,您的ReactFX组件公开EventStream,然后根据需要执行subscribe/unsubscribe。对于第二种情况也是如此。

票数 3
EN

Stack Overflow用户

发布于 2015-05-14 17:42:28

我不熟悉ReactFX,但是看看我可以推断出这些转换的API:

代码语言:javascript
复制
public static <T> Observable<T> toObservable(EventStream<? extends T> es) {
    return Observable.create(child -> {
        Subscription s = es.subscribe(child::onNext);
        child.add(Subscriptions.create(s::unsubscribe));
    });
}
public static <T> EventStream<T> toEventStream(Observable<? extends T> o) {
    return new EventStream<T>() {
        final Vector<Consumer<? super T>> observers = new Vector<>();
        @Override
        public void addObserver(Consumer<? super T> observer) {
            observers.add(observer);
        }

        @Override
        public void removeObserver(Consumer<? super T> observer) {
            observers.remove(observer);
        }
        @Override
        public Subscription subscribe(Consumer<? super T> subscriber) {
            addObserver(subscriber);

            rx.Subscriber<T> s = new rx.Subscriber<T>() {
                @Override
                public void onNext(T t) {
                    subscriber.accept(t);
                }
                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    removeObserver(subscriber);
                }
                @Override
                public void onCompleted() {
                    removeObserver(subscriber);
                }
            };
            o.subscribe(s);

            return () -> {
                s.unsubscribe();
                removeObserver(subscriber);
            };
        }
    };
}

尽管ReactFX不支持同步取消订阅,但两者都应该提供取消订阅的功能,而且我也不知道EventStream是否可以作为一个热的或冷的可观察的。我无法访问绑定所以帮不了你。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30241292

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档