我有3个可观测的,第一个可观测的输出是第二个可观测的。第一和第二可观测的输出是第三可观测的要求。
Observable<String> observableOne = Observable
.just("{1}")
.map(v -> {
System.out.println("Executing Observable 1");
return v;
});
Observable<String> observableTwo = observableOne
.map(observableOneValue -> {
System.out.println("Executing Observable 2");
return "{2"+observableOneValue+"}";
});
Observable.zip(
observableOne,
observableTwo,
(observableOneValue, observableTwoValue) ->
{
System.out.println("Executing Observable 3");
return "{3"+observableOneValue+observableTwoValue+"}";
}
).blockingSubscribe(System.out::println);这重复了第一个可观察到的执行,我当然可以使第一个可观察的缓存。但我想知道是否有更好的选择,特别是我正在寻找某种从第一个到另两个可观测到的消息传递构造。
发布于 2018-04-11 03:40:32
我不知道你到底想通过一个“消息传递结构”来寻找什么。cache将适用于上面的示例,但您提到您不想使用该示例。
另一个可能适合您的用例的选项是使用ConnectableObservable。它只在调用connect时才开始发出项目,而不是在订阅时。通过调用observableOne将您的ConnectableObservable转换为publish。那就设置你所有的订户。然后调用connect on observableOne。
ConnectableObservable<String> observableOne = Observable
.just("{1}")
.map(v -> {
System.out.println("Executing Observable 1");
return v;
}).publish();
Observable<String> observableTwo = observableOne
.map(observableOneValue -> {
System.out.println("Executing Observable 2");
return "{2"+observableOneValue+"}";
});
Observable.zip(
observableOne,
observableTwo,
(observableOneValue, observableTwoValue) ->
{
System.out.println("Executing Observable 3");
return "{3"+observableOneValue+observableTwoValue+"}";
}
).subscribe(System.out::println);
// Call when all the subscribers are ready --
observableOne.connect();备注
observableOne现在是ConnectableObservablesubscribe而不是blockingSubscribe,以便代码执行connect调用。https://stackoverflow.com/questions/49765151
复制相似问题