我有一个PublishSubject,我订阅了两次。第一个订阅者只计算处理的项目的数量,这个值总是与我通过观察者发送的内容相匹配。但是,另一个订阅者正在使用缓冲区,而我通常(75%)不会收到通过观察者的所有项目。我是不是错误地使用了缓冲区?在我停止发送给观察者以确保所有项目都被处理之后,我等待的时间比时间跨度更长。
Integer downloads1 = 0;
Integer downloads2 = 0;
PublishSubject<Object> subject = PublishSubject.create();
// this subscriber count matches the expected
subject.subscribe(s -> {
synchronized (downloads1) {
downloads1 += 1;
}
});
// this subscriber seems to miss items about 75% of the time
subject.buffer(100, TimeUnit.MILLISECONDS, 10).subscribe(list -> {
synchronized (downloads2) {
downloads2 += list.size();
}
});发布于 2014-04-17 18:31:16
也许你遇到了这个bug:https://github.com/Netflix/RxJava/issues/534
顺便说一句,你应该使用初始值为0的reduce(R initialValue, Func2<R,? super T,R> accumulator),而不是subscribe,这样你就不需要自己做任何同步了。
https://stackoverflow.com/questions/23120051
复制相似问题