我可能遗漏了什么,但我不知道是什么。
以下代码根本不起任何作用:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.subscribe();如果我试图阻止调用,它可以正常工作:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();奇怪的是,如果我“手动”创建了一个Flux (即不来自spring webClient),那么它工作得很好:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.doOnNext(System.out::println)
.subscribe();有人能解释一下我做错了什么吗?.subscribe()不是应该在第一种情况下执行操作吗,就像在最后一种情况下那样?
谢谢!
发布于 2018-09-17 23:56:06
短答案
subscribe不阻塞当前线程,这意味着app主线程可以在Flux发出任何元素之前完成。所以要么使用block,要么在主线程中使用等待。
详细信息
调用无args 订阅()只在Flux上生成request(unbounded),而不设置任何Subscriber。它通常在单独的线程中触发操作,但不会阻止当前线程。最有可能的是,您的主线程在WebClient接收到该单独线程中的响应之前就结束了,doOnNext(...)也发生了。
要演示/测试已启动的操作,请在主线程中等待一段时间。只需在subscribe()调用之后放置以下行:
Thread.sleep(1000);现在,在处理超时值之后,您将能够看到打印的结果。
现在,让我们隐式地为异步操作提供一个自定义Scheduler,并等待它的所有任务完成。另外,让我们将System.out::println作为subscribe(...)参数而不是doOnNext传递,这样完整的代码如下所示:
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking
executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread 本例使用的订阅(消费者)略有不同。最重要的是,它添加了由publishOn(调度器)支持的ExecutorService。后者则用于等待主线程中的终止。
当然,实现相同结果的更容易的方法是使用block(),正如您最初提到的那样:
webClient.get().uri("/some/path/here").retrieve() .bodyToMono(GetLocationsResponse.class) .doOnNext(System.out::println) .block()
最后,在使用Flux.just(...)...subscribe()的第三个示例中注意--在主线程终止之前,它似乎很快就完成了。这是因为与单个String元素的发射相比,发出几个GetLocationsResponse元素所需的时间要短得多(这意味着将request+read response+parse写入POJO所需的时间)。但是,如果将此Flux设置为延迟元素,则会复制相同的行为:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back againhttps://stackoverflow.com/questions/52375697
复制相似问题