短篇小说:我有一种情况,我有两个可观察到的,只有一个目的:
他们各自负责处理不同类型的数据。此外,当两个数据都已被处理时,我想要做一些事情。
我目前最好的实现如下,这些是我的观察:
Single<BlueData> blueObservable = Single.create(singleSubscriber -> {
if (BlueDataProcessor.isDataValid(myBlueData)) {
singleSubscriber.onSuccess(BlueDataProcessor.process(myBlueData));
}
else {
singleSubscriber.onError(new BlueDataIsInvalidThrow());
}
});
Single<RedData> redObservable = Single.create(singleSubscriber -> {
if (RedDataProcessor.isDataValid(myRedData)) {
singleSubscriber.onSuccess(RedDataProcessor.process(myRedData));
}
else {
singleSubscriber.onError(new RedDataIsInvalidThrowable());
}
});
Single<PurpleData> composedSingle = Single.zip(blueObservable, redObservable,
(blueData, redData) -> PurpleGenerator.combine(blueData, redData));我还有以下订阅:
blueObservable.subscribe(
result -> {
saveBlueProcessStats(result);
},
throwable -> {
logError(throwable);
});
redObservable.subscribe(
result -> {
saveRedProcessStats(result);
},
throwable -> {
logError(throwable);
});
composedSingle.subscribe(
combinedResult -> {
savePurpleProcessStats(combinedResult)
},
throwable -> {
logError(throwable);
});我的问题是:蓝红数据被处理了两次,因为这两个订阅都会再次运行,我订阅了用Observable.zip()创建的可观察到的组合。
如果不两次运行这两个操作,我怎么会有这种行为?
发布于 2016-11-16 17:40:57
对于1.x中的Single来说,这是不可能的,因为没有ConnectableSingle和Single.publish的概念。您可以通过2.x和RxJava2Extensions库实现这种效果:
SingleSubject<RedType> red = SingleSubject.create();
SingleSubject<BlueType> blue = SingleSubject.create();
// subscribe interested parties
red.subscribe(...);
blue.subscribe(...);
Single.zip(red, blue, (r, b) -> ...).subscribe(...);
// connect()
blueObservable.subscribe(blue);
redObservable.subscribe(red);https://stackoverflow.com/questions/40638488
复制相似问题