首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring Reactor线程模型

Spring Reactor线程模型
EN

Stack Overflow用户
提问于 2018-04-19 05:45:34
回答 2查看 8.1K关注 0票数 5

新手提醒Spring Webflux (v2.0.1.RELEASE)。

我想使用Spring Webflux作为后端(无网络)应用程序来处理来自JMS侦听器的大量数据。

我的理解是Spring Webflux提供了一个非阻塞/异步并发模型。然而,我有一个基本问题,我需要一些帮助。作为免责声明,反应式编程的整个概念对我来说是非常新的,我仍然处于这种范式转变的过程中。

考虑下面的代码:

代码语言:javascript
复制
Mono.just("ONE")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

Mono.just("TWO")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

我从文档中了解到,在调用“订阅”函数之前,事件处理链不会发生任何变化。

但在内部,spring是否(如果愿意)为"map“函数中的每个函数异步使用单独的线程?如果spring为这些链使用“单个”线程,那么这里的真正目的是什么?它不是基于不同语法的阻塞和单线程模型吗?

我观察到代码总是以相同的线程顺序运行。spring webflux的线程模型是什么?

EN

回答 2

Stack Overflow用户

发布于 2018-05-04 19:30:30

反应式编程是一种编程范式,因此它不会对技术实现做出任何假设。

The reactive manifesto描述了反应式系统,并在桌面上带来了异步通信和背压。除此之外,它也不会对技术细节做出任何假设。

Spring Reactor是Webflux的基础,它是一个允许您轻松构建反应式系统并遵循反应式编程范例的库。

流使用的线程取决于发布者。默认情况下使用当前线程。如果没有任何干预,如果发布者是同步的,则流不能是异步的。如果发布者阻止,则流将被阻止。但以下面的例子为例:

代码语言:javascript
复制
Flux.interval(Duration.ofMillis(100))
    .take(2)
    .subscribe(i -> System.out.println(Thread.currentThread().getName()));

Flux.interval在另一个线程上发布,因此链在另一个线程中异步运行。

让我们看另一个例子:

代码语言:javascript
复制
Scheduler scheduler = Schedulers.newElastic("foo");

Flux<Integer> flux = Flux.just(1, 2)
    .subscribeOn(scheduler);

flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));
flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));

您将注意到,每个订阅者都在其自己的线程上运行(虽然来自相同的线程池)。publishOn运算符与此类似。

如果您订阅了发布者,则可以使用相同的编程范例,而不管它是同步的还是异步的。而且您总是可以通过添加subscribeOnpublishOn操作符来引入异步行为。

票数 5
EN

Stack Overflow用户

发布于 2018-04-19 22:57:23

TL;DR:

Spring这是一个项目反应器的事情,-

  1. 并不决定在哪个项目反应器上运行什么操作,这使得判断你在哪里跨越线程边界变得更容易。此外,没有进行(显式)同步;这使得引入并发问题变得更加困难。

不,它不是具有不同语法的单线程模型。项目反应器尝试尽可能多地使用主线程来避免上下文切换。此外,它还提供了特殊的运算符,使您可以指定在其上运行先前操作的线程。

例如,这个修改后的示例将在不同的线程上运行;因为subscribeOn操作符定义了整个链在哪个线程池上运行:

代码语言:javascript
复制
Mono.just("ONE")
    .map(item -> func(" A " + item))
    .map(item -> func(" B " + item))
    .map(item -> func(" C " + item))
    .subscribeOn(Schedulers.elastic())
    .subscribe(item -> {
        System.out.println(Thread.currentThread().getName() + " " + item);
    });

Mono.just("TWO")
    .map(item -> func(" A " + item))
    .map(item -> func(" B " + item))
    .map(item -> func(" C " + item))
    .subscribeOn(Schedulers.elastic())
    .subscribe(item -> {
        System.out.println(Thread.currentThread().getName() + " " + item);
    });

在这种情况下,这两个操作都在elastic-x线程上执行;不会阻塞主线程。每次运行操作的顺序可能会有所不同。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49909488

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档