我知道subscribeOn用于在订阅序列时切换正在执行的线程,但我发现它不适合ServerRequest.body to /Flux。
有点像
Flux.just(1,2,3)
.doOnNext(integer -> log.info("test {}",integer))
.subscribeOn(Schedulers.elastic())
.subscribe();将使执行线程更改。
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 1
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 2
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 3但让我困惑的是
假设我有一个Spring WebFlux路由器:
@Configuration
public class TestRouter {
@Bean
public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
route -> route.PUT("/", req -> {
Mono<String> valueMono = req.bodyToMono(String.class);
return ServerResponse.ok().body(testService.test(valueMono), String.class);
}))).build();
}
}和服务:
@Service
@Slf4j
public class TestService {
public Mono<String> test(Mono<String> mono) {
return mono
.doOnSubscribe(subscription -> log.info("on subscribe"))
.subscribeOn(Schedulers.elastic())
.doOnNext(s -> log.info("received {}", s))
.subscribeOn(Schedulers.elastic());
}
}基本逻辑是http请求到本地主机:端口/测试将以纯文本的形式接收它发送给服务器的内容
我试图让doOnNext运行在其他线程上,而不是Spring的NIO,不管我把它放在哪里
subscribeOn执行线程始终是NIO线程:
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : received test感谢“MichaelBerry”SimonBasl,你们两个都帮了我很多忙,把你们的答案都投给我。
简而言之,netty将取代subscribeOn的http订阅,使用flatMap()在另一个Mono/Flux上包含一个单独的subscribeOn(),或者publishOn()可以完成我想要的工作。
发布于 2019-10-11 10:49:52
这不是您可以更改的东西--这只是在调用subscribeOn()之前链中的最后一个subscribe()调用,所以要由WebFlux使用它想要的任何调度程序。在本例中,它看起来是在NIO驱动的事件循环或类似的情况下处理请求。
但是,您可以在链中包含一个flatMap()调用,您可以为此指定一个不会被覆盖的单独的subscribeOn()。这可能是一个选项,取决于您的用例,因为您可以在flatMap()调用中定义的发布者中完成大部分工作。
https://stackoverflow.com/questions/58337530
复制相似问题