我有一个非常简单的RxJava示例
List<Integer> arrayIntegers = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
Observable.fromIterable(arrayIntegers).map(i -> {
Log.d("RxJava", "map i = " + i);
return i;
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).
subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer i) {
Log.d("RxJava", "next i = " + i);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
Log.d("RxJava", "Completed");
}
});给出了这个结果..。
D/RxJava: map i = 1
D/RxJava: map i = 2
D/RxJava: map i = 3
D/RxJava: map i = 4
D/RxJava: map i = 5
D/RxJava: next i = 1
D/RxJava: next i = 2
D/RxJava: next i = 3
D/RxJava: next i = 4
D/RxJava: next i = 5但我所期待的更像是这样的
D/RxJava: map i = 1
D/RxJava: next i = 1
D/RxJava: map i = 2
D/RxJava: next i = 2
D/RxJava: map i = 3
D/RxJava: next i = 3
D/RxJava: map i = 4
D/RxJava: next i = 4
D/RxJava: map i = 5
D/RxJava: next i = 5有人能解释我做错了什么,导致我的命令是不正确的吗?
发布于 2017-11-01 14:01:31
Rx管道在两个不同的线程中运行。第一个线程(Schedulers.newThread())执行fromIterable和map。第二个调用DisposableObserver。
在这两个线程之间,observeOn()操作符中有(一个不可见的)缓冲区。因此,第一个线程得到一些cpu时间,快速完成其所有工作,并将结果放入observeOn缓冲区。这种情况可能发生在CPU上线程的旋转过程中。
然后,启动第二个线程,它从缓冲区中获取项,并通过调用DisposableObserver继续进行处理。
我认为你在这里寻找的是使用一个线程。删除observeOn操作符,看看它是否做了您希望它做的事情。
编辑:刚刚尝试删除observeOn操作符并在我的机器上运行它。结果如下:
RxJava map i = 1
RxJava next i = 1
RxJava map i = 2
RxJava next i = 2
RxJava map i = 3
RxJava next i = 3
RxJava map i = 4
RxJava next i = 4
RxJava map i = 5
RxJava next i = 5作为以后的参考,打印到达各个Rx运算符的线程的名称对于调试可能是有用的。您可以使用Thread.currentThread().getName()获取名称。
https://stackoverflow.com/questions/47056511
复制相似问题