我正在创建一个多个观察者订阅的观察体。但是,我看到只有一个观察者onNext被调用。我到底做错了什么。
private Observable<String> getDataEmitter(String downloadFileName) {
ObservableOnSubscribe<String> handler = emitter -> {
String file = download(downloadFileName);
if (file == null) {
emitter.onError(e);
}
String[] tokens = file.split("\\n");
for (int i = 0; i < tokens.length; i++) {
emitter.onNext(token);
}
emitter.onComplete();
};
return Observable.create(handler);
}
public retrieve(String file) {
final Observable<String> sourceObservable = getDataEmitter(file)
.flatMap(id -> {
return Observable.from(service.find(id))
}, Pair::of)
.map(pair -> collect(request, pair));
sourceObservable
.flatMap(this::map)
.map(this::fileFormat)
.buffer(10)
.subscribe(batched -> {
System.out.println("b-1");
}, err -> {
System.out.println("error-1");
},
() -> {
System.out.println("completed-1");
});
sourceObservable
.map(pair -> format(pair))
.subscribe(e -> {
System.out.println("e-2" +e);
},
error -> System.out.println(error-2),
() -> System.out.println("completed-2"));
}输出仅来自第一个订阅。那就是,
b-1
completed-1为什么第二次订阅没有打印出来?我试着使用发布与连接,甚至重放。但是,这并没有帮助。我在这里做错了什么?
发布于 2019-01-19 11:38:00
您的可观测对象已在.buffer(10)上终止,因为一旦源可观测对象终止(see the documentation),buffers将从源可观测对象收集10个排放,并将整个发射作为批处理对象发出。你不会再观察到它的排放了。
https://stackoverflow.com/questions/54262549
复制相似问题