首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >弹簧5反应堆中未签署的通量

弹簧5反应堆中未签署的通量
EN

Stack Overflow用户
提问于 2018-09-17 21:06:32
回答 1查看 5.3K关注 0票数 8

我可能遗漏了什么,但我不知道是什么。

以下代码根本不起任何作用:

代码语言:javascript
复制
webClient.get().uri("/some/path/here").retrieve()
     .bodyToMono(GetLocationsResponse.class)
     .doOnNext(System.out::println)
     .subscribe();

如果我试图阻止调用,它可以正常工作:

代码语言:javascript
复制
webClient.get().uri("/some/path/here").retrieve()
      .bodyToMono(GetLocationsResponse.class)
      .doOnNext(System.out::println)
      .block();

奇怪的是,如果我“手动”创建了一个Flux (即不来自spring webClient),那么它工作得很好:

代码语言:javascript
复制
Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .doOnNext(System.out::println)
    .subscribe();

有人能解释一下我做错了什么吗?.subscribe()不是应该在第一种情况下执行操作吗,就像在最后一种情况下那样?

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-17 23:56:06

短答案

subscribe不阻塞当前线程,这意味着app主线程可以在Flux发出任何元素之前完成。所以要么使用block,要么在主线程中使用等待。

详细信息

调用无args 订阅()只在Flux上生成request(unbounded),而不设置任何Subscriber。它通常在单独的线程中触发操作,但不会阻止当前线程。最有可能的是,您的主线程在WebClient接收到该单独线程中的响应之前就结束了,doOnNext(...)也发生了。

要演示/测试已启动的操作,请在主线程中等待一段时间。只需在subscribe()调用之后放置以下行:

代码语言:javascript
复制
Thread.sleep(1000);

现在,在处理超时值之后,您将能够看到打印的结果。

现在,让我们隐式地为异步操作提供一个自定义Scheduler,并等待它的所有任务完成。另外,让我们将System.out::println作为subscribe(...)参数而不是doOnNext传递,这样完整的代码如下所示:

代码语言:javascript
复制
ExecutorService executor = Executors.newSingleThreadExecutor(); 

webClient.get().uri("/some/path/here").retrieve()
    .bodyToMono(GetLocationsResponse.class)
    .publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
    .subscribe(System.out::println); //still non-blocking

executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread 

本例使用的订阅(消费者)略有不同。最重要的是,它添加了由publishOn(调度器)支持的ExecutorService。后者则用于等待主线程中的终止。

当然,实现相同结果的更容易的方法是使用block(),正如您最初提到的那样:

webClient.get().uri("/some/path/here").retrieve() .bodyToMono(GetLocationsResponse.class) .doOnNext(System.out::println) .block()

最后,在使用Flux.just(...)...subscribe()的第三个示例中注意--在主线程终止之前,它似乎很快就完成了。这是因为与单个String元素的发射相比,发出几个GetLocationsResponse元素所需的时间要短得多(这意味着将request+read response+parse写入POJO所需的时间)。但是,如果将此Flux设置为延迟元素,则会复制相同的行为:

代码语言:javascript
复制
Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
    .doOnNext(System.out::println)
    .subscribe(); 


Flux.just("1", "2", "3")
    .filter(s -> !s.equals("2"))
    .delayElements(Duration.ofMillis(500))
    .doOnNext(System.out::println)
    .blockLast(); //and that makes it printing back again
票数 20
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52375697

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档