添加一些代码来澄清这个问题
//generates a sequence in the range from input value (+1) to input value (+9)
Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color) {
return Observable.range(value+1,9)
.map(i -> {
Log.d(TAG, "Value " + i
+ " evaluating on " + Thread.currentThread().getName()
+ " emitting item at " + System.currentTimeMillis());
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
}
return new ColoredIntegerModel(i, color);
});
}
//creates a stream if say input =2 of numbers from 1-20 (input*2) such that the output is 1 (Red color) 2-10(green color) 11 (Red color) 11-20 (Green Color)
Observable<ColoredIntegerModel> getEventStream(int value) {
return Observable.create(new ObservableOnSubscribe<ColoredIntegerModel>() {
@Override
public void subscribe(ObservableEmitter<ColoredIntegerModel> emitter) throws Exception {
for (int i = 0; i < value; ++i) {
ColoredIntegerModel model = new ColoredIntegerModel(i*10, Color.RED);
emitter.onNext(model);
Observable<ColoredIntegerModel> more = getSequenceObservable(i*10, 100, Color.GREEN);
more.subscribe(new Consumer<ColoredIntegerModel>() {
@Override
public void accept(ColoredIntegerModel coloredIntegerModel) throws Exception {
emitter.onNext(coloredIntegerModel);
}
});
}
}
});
}上面的代码可以工作。它打印1(红色)2-10(绿色)11(红色),12-20,但我想要一个更干净的解决方案。我也不确定什么时候可以处理getEventStream()中的内部订阅。
基本上,问题是getEventStream为每个返回可观察值的发射调用一个函数。这类似于一系列承诺,其中每个单独的承诺都可以返回一系列其他承诺。希望这能澄清对原始问题的任何混淆。
发布于 2018-09-11 02:22:40
如果您想要的是简化上面的代码,以便所有订阅处理都留给最终订阅者,并维护后续序列发出的顺序,您可以这样做:
Observable<ColoredIntegerModel> getSequenceObservable(int value, int delay, int color) {
return Observable.range(value+1, 9)
.flatMap( i -> Observable
.just(new ColoredIntegerModel(i, color))
.delay(delay * (i + 1), TimeUnit.MILLISECONDS)
)
;
}
Observable<ColoredIntegerModel> getEventStream(int value) {
return Observable.range(0, value)
.concatMap(i ->
getSequenceObservable(i * 10,100, Color.GREEN)
.startWith(new ColoredIntegerModel(i*10, Color.RED))
)
;
}也就是说,如果您确实需要手动延迟,如果不需要,只需将上面的getSequenceObservable替换为:
Observable<ColoredIntegerModel> getSequenceObservable(int value, int color) {
return Observable.range(value+1, 9)
.map(i -> new ColoredIntegerModel(i, color))
;
}发布于 2018-09-13 03:15:14
你应该看看FlatMap operator
简而言之,它将Observable中的每个元素转换为它自己的Observable,并将它们连接起来。
对你的问题最简单的解决方案可能是这样的:
getEventStream()
.flatMap(it -> getSequenceObservable(it))
.doOnNext(System.out::print)
.blockingSubscribe();其中帮助函数是
static Observable<ColoredIntegerModel> getEventStream() {
return Observable.fromArray(
new ColoredIntegerModel(10, Color.RED),
new ColoredIntegerModel(20, Color.RED)
);
}
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color) {
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.SECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN))
);
}如果你想保留getEventStream()的原始值,你可以用下面的代码代替getSequenceObservable
static Observable<ColoredIntegerModel> getSequenceObservable(ColoredIntegerModel color) {
return Observable.range(1, 10)
.flatMap(it -> Observable.timer(it, TimeUnit.MILLISECONDS)
.map(time -> new ColoredIntegerModel(time.intValue(), Color.GREEN)))
.concatWith(Observable.just(color));
}如果排放的顺序很重要,请使用带有maxConcurrency的flatMap版本:
getEventStream()
.flatMap(it -> getSequenceObservable(it), true, 1)
.doOnNext(System.out::println)
.blockingSubscribe();https://stackoverflow.com/questions/52229797
复制相似问题