我对在同一方法中使用多个Scheduler策略有疑问。
Function<Object, Flux<String>> fetchGreekGods = obj -> {
return Flux.just(this.config.getApiMap().get(GREEK))
.publishOn(Schedulers.immediate())
.map(toURL)
.map(fetch)
.flatMap(serializeFlux)
.log();
};
Function<Flux<String>, Flux<Tuple2<String, Integer>>> fetchWikipediaGodInfo = god -> {
return Flux.from(god)
.publishOn(Schedulers.elastic())
.map(str -> {
return new Tuple2<String, Integer>(str, generateWikiAddress
.andThen(toURL)
.andThen(fetch)
.andThen(String::length)
.apply(str));
})
.log();
};
Function<Flux<Tuple2<String, Integer>>, Flux<String>> max = godInfo -> {
return Flux.from(godInfo)
.publishOn(Schedulers.immediate())
.sort(Comparator.comparing(Tuple2::_2))
.takeLast(1)
.map(t -> t._1)
.log();
};
public Mono<String> reactorSolution() {
return Flux.empty()
.transform(fetchGreekGods)
.transform(fetchWikipediaGodInfo)
.transform(max)
.next();
}为什么我为每个转换定义不同的调度器,在最后一个中,在Max的执行中,在日志中,我观察到了前一个策略:fetchWikipediaGodInfo
在逻辑上,我希望对于max,代码使用主线程,但目前,我观察到了前一个。
2019-09-22 18:08:30 [elastic-2] INFO reactor.Flux.MapFuseable.3 - | onNext(Apollo)
2019-09-22 18:08:30 [elastic-2] INFO reactor.Flux.MapFuseable.3 - | cancel()
2019-09-22 18:08:30 [elastic-2] INFO reactor.Flux.MapFuseable.3 - | onComplete()理论上,第三个转换应该在主线程中运行,但目前该方法是在elastic中运行的,但它是不正确的,因为我显式地定义了在主线程中运行。
如何修复它?
非常感谢你提前
胡安·安东尼奥
发布于 2019-09-23 02:24:37
根据有关Schedulers的文档
Schedulers类具有提供对以下执行上下文的访问的静态方法:
当前线程不一定是主线程,它很可能是最后一个线程。之前的变换是使用弹性的。
https://stackoverflow.com/questions/58051097
复制相似问题