我试图将我的单声道分裂为其他分离的单声道,这些单声道将在不同线程上处理相同的数据输入数据。
public Mono<String> process() {
Mono<String> someString = ... // fetching data from API
someString
.publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
.map(String::toLowerCase)
.subscribe(this::saveLowercase);
someString
.publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
.map(String::toUpperCase)
.subscribe(this::saveUpperCase);
return someString;
}我在日志中看到我获取了3次数据,因为每次订阅调用都会从API中获取数据。我想使用.cache()方法,但我想知道一些新的更好的方法来只调用一次API并处理成倍的数据?如果Reactor不能做到这一点,我可以将Reactor换成RxJava。
发布于 2019-12-11 21:20:49
在您给出的示例中,您创建了一个冷Mono。因此,正如您所说的,每个subscriber都会发生HTTP调用。如果希望HTTP调用只发生一次,请创建hot observable。您必须使用.cache(),以便任何将来的订阅者都能获得响应。
这是执行此操作的正确方法。为什么你要寻找“更好的东西”?
官方文档中的示例:
DirectProcessor<String> hotSource = DirectProcessor.create();
Flux<String> hotFlux = hotSource.map(String::toUpperCase).cache();
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
hotSource.onNext("green");
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete(); 现在,两个订阅者都将获得所有的颜色。
https://stackoverflow.com/questions/59236763
复制相似问题