我是一个新的反应,并试图以有效的方式完成以下任务。我有一个表,其中包含每个用户的事件。我正在尝试获取与最新类别过滤的给定用户的每个事件名称的最新行。
table structure
user id , category , event name, details , insert timestamp, event text( payload of event)reactive // call 1 to Cassandra中的方法
Mono<Event> latestCategory = repository.findByUserId(userId).sort().next(); // sort is by insert
timestamp;//调用2到Cassandra
Flux<Event> fluxEvents = repository.findByUserId(userId)
.groupBy(Event::name) //grouping by event name
.flatmap(grp -> {
grp.sort() // sorting for each event
grp.next().zipWith(latestCategory) // picking latest row for each event
.filter(eventWithLatestCategory ->
eventWithLatestCategory.getT1().category.equals(eventWithLatestCategory.getT2().category) //filtering by each category
.map(Tuple2::getT1)// picking latest event row for latest category
};从功能上讲,一切都很正常,但我看到的问题是,表中的每一行都会发生DB调用。在命令式编程中,我可以通过一个db调用来实现它,然后应用上面的逻辑。我如何在反应式世界中做同样的事情?
enter code here发布于 2020-06-29 07:09:03
对于你所拥有的东西,“最轻触”的方法应该是:
Mono<Event> latestCategory = repository.findByUserId(userId).sort().next().cache();...which意味着latestCategory只会被获取一次,然后为所有后续订阅进行缓存。
不过,这可能不是最佳的解决方案。
在本例中,按照目前的情况,您是在Flux本身中进行排序的。这通常不是明智之举,但如果你确实需要这样做,那么你也可以这样做:
repository.findByUserId(userId).sort().collectList().map(eventList -> {
//Deal imperatively with a List<Event>
});然后,...and可以通过单个数据库调用访问该列表,您可以根据需要随机查询该列表。因为只有一次数据库调用,而且排序的Flux在整个发布器完成之前永远不会输出任何内容,所以这与简单地收集到列表之间没有实质性的区别。
然而,更好的方法是让cassandra进行底层排序,然后使用switchOnFirst()压缩第一个元素(您的最新类别)以及出现的所有其他元素:
repository.findByUserId(userId).switchOnFirst((signal, flux) ->
flux.map(val -> Tuples.of(signal.get(), val)) //(In real-world use, check the `flux` actually has a value first)
)
//...etc这意味着:
https://stackoverflow.com/questions/62625857
复制相似问题