我目前正在用spring-boot-starter-webflux on netty和jOOQ开发一个应用程序,包括SpringBoot 2和jOOQ。
下面是我经过几个小时的研究和堆栈溢出搜索后得出的代码。我已经建立了大量的日志记录,以了解在哪个线程上发生了什么。
UserController:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
return Mono.just(user)
.map(it -> {
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return it;
})
.map(userService::create)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}UserService:
public int create(ImUser user) {
return Mono.just(user)
.subscribeOn(Schedulers.elastic())
.map(u -> {
logger.debug("UserService thread: " + Thread.currentThread().getName());
return imUserDao.insertUser(u);
})
.block();
}UserDao:
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
.values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
.returning(IM_USER.ID)
.fetchOne()
.getId();
}代码按预期工作,“接收请求”和“发送响应”都运行在同一个线程(reactor-http-server-epoll-x)上,而阻塞代码(对imUserDao.insertUser(u)的调用)运行在弹性调度器线程(elastic-x).上。事务绑定到调用注释方法的线程(这是弹性的-x),因此按预期的方式工作(我已经用不在这里发布的另一种方法对它进行了测试,以保持简单)。
下面是一个日志示例
20:57:21,384 DEBUG admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG tools.LoggerListener| Executing query
...
20:57:21,401 DEBUG tools.StopWatch| Finishing : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG admin.UserController| Sending response on thread: reactor-http-server-epoll-7我已经研究了很长一段时间的反应程序,但从来没有真正的编程任何反应。现在我在想,我是否做得对。以下是我的问题:
1.上面的代码是处理传入请求、查询DB然后响应的好方法吗?请忽略logger.debug(…)为了我的理智,我构建了一些调用:)我希望有一个Flux< ImUser>作为控制器方法的参数,也就是说,我有一个多个潜在请求流,这些请求在某个时候会出现,并且都将以相同的方式处理。相反,我发现的示例在每次请求传入时都会创建一个Mono.from(...);。
在Mono.just(user) ()中创建的第二个Mono感到有些尴尬。我知道我需要启动一个新的流才能在弹性调度程序上运行代码,但是没有一个操作符可以这样做吗?
3.从编写代码的方式来看,我了解到UserService中的Mono将被阻塞,直到DB操作完成,但是为请求提供服务的原始流不会被阻塞。这是正确的吗?
Schedulers.elastic() 4. 4. --我计划用一个并行Scheduler代替,在这里,我可以配置工作线程的数量。这样做的想法是,最大工作线程的数量应该与最大DB连接相同。当计划程序中的所有工作线程都很忙时会发生什么?那是背压跃进的时候吗?
5.最初希望在控制器中包含以下代码:
return userService.create(user)
.map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));但我没有做到这一点,并使事情在正确的线程中运行。有什么方法可以在我的代码中实现这一点吗?
任何帮助都将不胜感激。谢谢!
发布于 2017-03-31 08:09:21
服务与控制器
您的服务是阻塞的这一事实是有问题的,因为在控制器中,您正在从map内部调用一个阻塞方法,该方法不会在单独的线程上移动。这有可能阻止所有控制器。
相反,您可以做的是从UserService#create返回一个block() (最后移除block())。由于服务确保Dao方法调用是孤立的,所以问题较少。从这里开始,无需在Controller中执行Mono.just(user):只需调用create并开始将操作符直接链接到生成的Mono上:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
//this log as you saw was executed in the same thread as the controller method
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return userService.create(user)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}测井
请注意,如果您想要记录某项内容,还有几个比执行map并返回it更好的选择
doOnNext方法是为此量身定做的:对一个反应信号作出反应(在本例中,onNext:一个值被发出)并执行一些非变异操作,使输出序列与源序列完全相同。例如,doOn的“副作用”可以是写入控制台或递增统计计数器.还有doOnComplete,doOnError,doOnSubscribe,doOnCancel等.log简单地将所有事件记录在上面的序列中。它将检测是否使用SLF4J,如果使用,则在调试级别使用配置的记录器。否则,它将使用JDK日志功能(因此您还需要配置它以显示调试级别的日志)。--关于事务、或任何依赖于ThreadLocal的东西的词
ThreadLocal和线程粘着性在反应式编程中是有问题的,因为底层执行模型在整个序列中保持不变的保证较少。Flux可以在几个步骤中执行,每个步骤在不同的Scheduler中执行(线程或线程池也是如此)。即使在特定的步骤中,一个值也可以由底层线程池的线程A处理,而下一个值(稍后到达的值)将在线程B上处理。
在这种情况下,依赖Thread就不那么简单了,我们目前正在积极提供更适合于反应性世界的替代方案。
创建一个连接池大小的池的想法是好的,但不一定足够,一个事务流可能会使用多个线程,因此可能会用事务污染一些线程。
当一个池用尽线程时会发生什么
如果您使用特定的Scheduler来隔离像这里这样的阻塞行为,那么一旦它用完了线程,它就会抛出一个RejectedExecutionException。
https://stackoverflow.com/questions/43130036
复制相似问题