首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >理解Spring的Web反应框架

理解Spring的Web反应框架
EN

Stack Overflow用户
提问于 2017-03-30 23:32:55
回答 1查看 3.9K关注 0票数 9

我目前正在用spring-boot-starter-webflux on netty和jOOQ开发一个应用程序,包括SpringBoot 2jOOQ

下面是我经过几个小时的研究和堆栈溢出搜索后得出的代码。我已经建立了大量的日志记录,以了解在哪个线程上发生了什么。

UserController:

代码语言:javascript
复制
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
    return Mono.just(user)
            .map(it -> {
                logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
                return it;
            })
            .map(userService::create)
            .map(it -> {
                logger.debug("Sending response on thread: " + Thread.currentThread().getName());
                return ResponseEntity.status(HttpStatus.CREATED).body(it);
            })
            .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}

UserService:

代码语言:javascript
复制
public int create(ImUser user) {
    return Mono.just(user)
            .subscribeOn(Schedulers.elastic())
            .map(u -> {
                logger.debug("UserService thread: " + Thread.currentThread().getName());
                return imUserDao.insertUser(u);
            })
            .block();
}

UserDao:

代码语言:javascript
复制
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
    logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
    return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
            .values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
            .returning(IM_USER.ID)
            .fetchOne()
            .getId();
}

代码按预期工作,“接收请求”和“发送响应”都运行在同一个线程(reactor-http-server-epoll-x)上,而阻塞代码(对imUserDao.insertUser(u)的调用)运行在弹性调度器线程(elastic-x).上。事务绑定到调用注释方法的线程(这是弹性的-x),因此按预期的方式工作(我已经用不在这里发布的另一种方法对它进行了测试,以保持简单)。

下面是一个日志示例

代码语言:javascript
复制
20:57:21,384 DEBUG         admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG            admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG        admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG         tools.LoggerListener| Executing query          
...
20:57:21,401 DEBUG              tools.StopWatch| Finishing                : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG         admin.UserController| Sending response on thread: reactor-http-server-epoll-7

我已经研究了很长一段时间的反应程序,但从来没有真正的编程任何反应。现在我在想,我是否做得对。以下是我的问题:

1.上面的代码是处理传入请求、查询DB然后响应的好方法吗?请忽略logger.debug(…)为了我的理智,我构建了一些调用:)我希望有一个Flux< ImUser>作为控制器方法的参数,也就是说,我有一个多个潜在请求流,这些请求在某个时候会出现,并且都将以相同的方式处理。相反,我发现的示例在每次请求传入时都会创建一个Mono.from(...);

Mono.just(user) ()中创建的第二个Mono感到有些尴尬。我知道我需要启动一个新的流才能在弹性调度程序上运行代码,但是没有一个操作符可以这样做吗?

3.从编写代码的方式来看,我了解到UserService中的Mono将被阻塞,直到DB操作完成,但是为请求提供服务的原始流不会被阻塞。这是正确的吗?

Schedulers.elastic() 4. 4. --我计划用一个并行Scheduler代替,在这里,我可以配置工作线程的数量。这样做的想法是,最大工作线程的数量应该与最大DB连接相同。当计划程序中的所有工作线程都很忙时会发生什么?那是背压跃进的时候吗?

5.最初希望在控制器中包含以下代码:

代码语言:javascript
复制
return userService.create(user)
            .map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
            .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));

但我没有做到这一点,并使事情在正确的线程中运行。有什么方法可以在我的代码中实现这一点吗?

任何帮助都将不胜感激。谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-31 08:09:21

服务与控制器

您的服务是阻塞的这一事实是有问题的,因为在控制器中,您正在从map内部调用一个阻塞方法,该方法不会在单独的线程上移动。这有可能阻止所有控制器。

相反,您可以做的是从UserService#create返回一个block() (最后移除block())。由于服务确保Dao方法调用是孤立的,所以问题较少。从这里开始,无需在Controller中执行Mono.just(user):只需调用create并开始将操作符直接链接到生成的Mono上:

代码语言:javascript
复制
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
    //this log as you saw was executed in the same thread as the controller method
    logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
    return userService.create(user)
        .map(it -> {
            logger.debug("Sending response on thread: " + Thread.currentThread().getName());
            return ResponseEntity.status(HttpStatus.CREATED).body(it);
        })
        .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}

测井

请注意,如果您想要记录某项内容,还有几个比执行map并返回it更好的选择

  • doOnNext方法是为此量身定做的:对一个反应信号作出反应(在本例中,onNext:一个值被发出)并执行一些非变异操作,使输出序列与源序列完全相同。例如,doOn的“副作用”可以是写入控制台或递增统计计数器.还有doOnComplete,doOnError,doOnSubscribe,doOnCancel等.
  • log简单地将所有事件记录在上面的序列中。它将检测是否使用SLF4J,如果使用,则在调试级别使用配置的记录器。否则,它将使用JDK日志功能(因此您还需要配置它以显示调试级别的日志)。

--关于事务、或任何依赖于ThreadLocal的东西的词

ThreadLocal和线程粘着性在反应式编程中是有问题的,因为底层执行模型在整个序列中保持不变的保证较少。Flux可以在几个步骤中执行,每个步骤在不同的Scheduler中执行(线程或线程池也是如此)。即使在特定的步骤中,一个值也可以由底层线程池的线程A处理,而下一个值(稍后到达的值)将在线程B上处理。

在这种情况下,依赖Thread就不那么简单了,我们目前正在积极提供更适合于反应性世界的替代方案。

创建一个连接池大小的池的想法是好的,但不一定足够,一个事务流可能会使用多个线程,因此可能会用事务污染一些线程。

当一个池用尽线程时会发生什么

如果您使用特定的Scheduler来隔离像这里这样的阻塞行为,那么一旦它用完了线程,它就会抛出一个RejectedExecutionException

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43130036

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档