首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring Reactive实现多次调用db

Spring Reactive实现多次调用db
EN

Stack Overflow用户
提问于 2020-06-29 01:16:43
回答 1查看 257关注 0票数 1

我是一个新的反应,并试图以有效的方式完成以下任务。我有一个表,其中包含每个用户的事件。我正在尝试获取与最新类别过滤的给定用户的每个事件名称的最新行。

代码语言:javascript
复制
table structure
user id , category , event name, details , insert timestamp, event text( payload of event)

reactive // call 1 to Cassandra中的方法

代码语言:javascript
复制
Mono<Event> latestCategory = repository.findByUserId(userId).sort().next(); // sort is by insert 
timestamp;

//调用2到Cassandra

代码语言:javascript
复制
Flux<Event> fluxEvents = repository.findByUserId(userId)
                .groupBy(Event::name) //grouping by event name
                 .flatmap(grp -> {
                 grp.sort() // sorting for each event
                 grp.next().zipWith(latestCategory) // picking latest row for each event
                 .filter(eventWithLatestCategory -> 
eventWithLatestCategory.getT1().category.equals(eventWithLatestCategory.getT2().category) //filtering by each category
.map(Tuple2::getT1)// picking latest event row for latest category
};

从功能上讲,一切都很正常,但我看到的问题是,表中的每一行都会发生DB调用。在命令式编程中,我可以通过一个db调用来实现它,然后应用上面的逻辑。我如何在反应式世界中做同样的事情?

代码语言:javascript
复制
enter code here
EN

回答 1

Stack Overflow用户

发布于 2020-06-29 07:09:03

对于你所拥有的东西,“最轻触”的方法应该是:

代码语言:javascript
复制
Mono<Event> latestCategory = repository.findByUserId(userId).sort().next().cache();

...which意味着latestCategory只会被获取一次,然后为所有后续订阅进行缓存。

不过,这可能不是最佳的解决方案。

在本例中,按照目前的情况,您是在Flux本身中进行排序的。这通常不是明智之举,但如果你确实需要这样做,那么你也可以这样做:

代码语言:javascript
复制
repository.findByUserId(userId).sort().collectList().map(eventList -> {
    //Deal imperatively with a List<Event>
});

然后,...and可以通过单个数据库调用访问该列表,您可以根据需要随机查询该列表。因为只有一次数据库调用,而且排序的Flux在整个发布器完成之前永远不会输出任何内容,所以这与简单地收集到列表之间没有实质性的区别。

然而,更好的方法是让cassandra进行底层排序,然后使用switchOnFirst()压缩第一个元素(您的最新类别)以及出现的所有其他元素:

代码语言:javascript
复制
repository.findByUserId(userId).switchOnFirst((signal, flux) ->
    flux.map(val -> Tuples.of(signal.get(), val)) //(In real-world use, check the `flux` actually has a value first)
)
//...etc

这意味着:

  • 您只需要一个数据库查询;
  • 您可以在每个值出现时对其进行反应性处理,而无需等待整个流结束;
  • 不需要缓存发布者,如有必要,最好避免缓存。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62625857

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档