在这些条件下考虑以下代码:
getOneResponePage(int)生成一个Flux<Integer>。它模拟从另一个服务中获取结果页的请求。它的实现应该可以被当作黑匣子来回答我的问题。(关于它的实际用途,请参见下面的说明。)
getOneResponePage(int)最终将返回一个空的Flux<Integer>,这是表示不会出现更多结果的方式。(但它将继续发射空的Flux<Integer>。)
package ch.cimnine.test;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class PaginationTest {
@Test
public void main() {
final Flux<Integer> finalFlux = getAllResponses();
finalFlux.subscribe(resultItem -> {
try {
Thread.sleep(200); // Simulate heavy processing
} catch (InterruptedException ignore) {
}
System.out.println(resultItem);
});
}
private Flux<Integer> getAllResponses() {
Flux<Flux<Integer>> myFlux = Flux.generate(
() -> 0, // inital page
(page, sink) -> {
var innerFlux = getOneResponePage(page); // eventually returns a Flux.empty()
// my way to check whether the `innerFlux` is now empty
innerFlux.hasElements().subscribe(
hasElements -> {
if (hasElements) {
System.out.println("hasElements=true");
sink.next(innerFlux);
return;
}
System.out.println("hasElements=false");
sink.complete();
}
);
return page + 1;
}
);
return Flux.concat(myFlux);
}
private Flux<Integer> getOneResponePage(int page) {
System.out.println("Request for page " + page);
// there's only content on the first 3 pages
if (page < 3) {
return Flux
.just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
.map(i -> (1000 * page) + i);
}
return Flux.empty();
}
}目标
目标是有一个名为getAllResponses()的方法,它返回结果的连续Flux<T>。调用方不应该知道--或者不关心--某些分页逻辑在内部发生。所有其他方法都将对调用方隐藏。
问题
由于我刚开始使用反应性编程,我是否认为这个right?
getOneResponePage(int)到底是什么?
在我的实际代码中,getOneResponsePage(int)使用org.springframework.web.reactive.function.client.WebClient发送请求。它连接到正在返回结果的服务。该服务每次调用只返回1000个结果。必须发送offset参数才能获得更多的结果。
这个API有点奇怪,因为要确定是否有所有结果,唯一的方法是使用一个不断增加的offset反复查询它,直到得到一个空的结果集。它将很高兴地为一个仍在增长的offset (…)返回更多的空结果集。直到达到offset的某些内部最大值并返回一个400 Bad Request为止。)
getOneResponePage(int)的实际实现几乎与此相同:
private Flux<ResponseItem> getOneResponePage(int page) {
return webClientInstance
.get()
.uri(uriBuilder -> {
uriBuilder.queryParam("offset", page * LIMIT);
uriBuilder.queryParam("limit", LIMIT);
// …
})
.retrieve()
.bodyToFlux(ResponseItem.class);
}发布于 2022-06-16 17:14:47
Flux<Flux<T>>。另一个反模式是显式订阅(innerFlux.hasElements().subscribe)。理想情况下,您只需要订阅一次,通常是在框架层(例如,WebFlux在下划线HTTP服务器中订阅)。使用不断增加的指针(页码、偏移量等)查询数据是一种非常常见的模式,您可以使用expand操作符来实现它。在Flux的情况下,它将尝试扩展每个元素。对于分页,通常Mono<List<T>>更有用。展开将尝试展开每个页面,并在getOneResponePage返回Mono.empty()时停止。
private Flux<Integer> getAllResponses() {
var page = new AtomicInteger(0); // initial page
return getOneResponePage(page.get())
.expand(list -> getOneResponePage(page.incrementAndGet()))
.flatMapIterable(Function.identity());
}
private Mono<List<Integer>> getOneResponePage(int page) {
System.out.println("Request for page " + page);
// there's only content on the first 3 pages
if (page < 3) {
return Flux
.just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
.map(i -> (1000 * page) + i)
.collectList();
}
return Mono.empty();
}parallel调度程序上订阅。应该使用boundedElastic来“卸载”阻塞任务。使用.subscribeOn(Schedulers.parallel())。有关更多详细信息,您可以查看Difference between boundedElastic() vs parallel() scheduler发布于 2022-06-16 15:36:12
没有直接的方法阻止从内部流动的外部流动。最接近的方法是在内部序列上使用switchIfEmpty和一个Flux.error(NoSuchElementException),然后在外部序列上使用onErrorResumeNext,如果发现了NoSuchElementException,则返回一个空的Flux。
Flux.just(listOf(1, 2, 3), listOf(), listOf(4, 5, 6))
.flatMap(list ->
Flux.fromIterable(list)
.switchIfEmpty(Flux.error(new NoSuchElementException()))
)
.onErrorResumeNext(e ->
e instanceof NoSuchElementException ?
Flux.empty() : Flux.error(e)
);https://stackoverflow.com/questions/72647698
复制相似问题