首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Flux<Flux<T>>中,如果内部Flux<T>最终为空,如何完成外部通量

在Flux<Flux<T>>中,如果内部Flux<T>最终为空,如何完成外部通量
EN

Stack Overflow用户
提问于 2022-06-16 14:42:55
回答 2查看 264关注 0票数 0

在这些条件下考虑以下代码:

getOneResponePage(int)生成一个Flux<Integer>。它模拟从另一个服务中获取结果页的请求。它的实现应该可以被当作黑匣子来回答我的问题。(关于它的实际用途,请参见下面的说明。)

getOneResponePage(int)最终将返回一个空的Flux<Integer>,这是表示不会出现更多结果的方式。(但它将继续发射空的Flux<Integer>。)

代码语言:javascript
复制
package ch.cimnine.test;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class PaginationTest {
    @Test
    public void main() {
        final Flux<Integer> finalFlux = getAllResponses();

        finalFlux.subscribe(resultItem -> {
            try {
                Thread.sleep(200); // Simulate heavy processing
            } catch (InterruptedException ignore) {
            }

            System.out.println(resultItem);
        });
    }

    private Flux<Integer> getAllResponses() {
        Flux<Flux<Integer>> myFlux = Flux.generate(
            () -> 0, // inital page
            (page, sink) -> {
                var innerFlux = getOneResponePage(page); // eventually returns a Flux.empty()

                // my way to check whether the `innerFlux` is now empty
                innerFlux.hasElements().subscribe(
                    hasElements -> {
                        if (hasElements) {
                            System.out.println("hasElements=true");
                            sink.next(innerFlux);
                            return;
                        }

                        System.out.println("hasElements=false");
                        sink.complete();
                    }
                );

                return page + 1;
            }
        );

        return Flux.concat(myFlux);
    }

    private Flux<Integer> getOneResponePage(int page) {
        System.out.println("Request for page " + page);
        
        // there's only content on the first 3 pages
        if (page < 3) {
            return Flux
                .just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
                .map(i -> (1000 * page) + i);
        }

        return Flux.empty();
    }
}

目标

目标是有一个名为getAllResponses()的方法,它返回结果的连续Flux<T>。调用方不应该知道--或者不关心--某些分页逻辑在内部发生。所有其他方法都将对调用方隐藏。

问题

由于我刚开始使用反应性编程,我是否认为这个right?

  • IntelliJ警告我不建议在非阻塞上下文中调用“订阅”。如何做对?

getOneResponePage(int)到底是什么?

在我的实际代码中,getOneResponsePage(int)使用org.springframework.web.reactive.function.client.WebClient发送请求。它连接到正在返回结果的服务。该服务每次调用只返回1000个结果。必须发送offset参数才能获得更多的结果。

这个API有点奇怪,因为要确定是否有所有结果,唯一的方法是使用一个不断增加的offset反复查询它,直到得到一个空的结果集。它将很高兴地为一个仍在增长的offset (…)返回更多的空结果集。直到达到offset的某些内部最大值并返回一个400 Bad Request为止。)

getOneResponePage(int)的实际实现几乎与此相同:

代码语言:javascript
复制
private Flux<ResponseItem> getOneResponePage(int page) {
    return webClientInstance
        .get()
        .uri(uriBuilder -> {
            uriBuilder.queryParam("offset", page * LIMIT);
            uriBuilder.queryParam("limit", LIMIT);
            // …
        })
        .retrieve()
        .bodyToFlux(ResponseItem.class);
}
EN

回答 2

Stack Overflow用户

发布于 2022-06-16 17:14:47

  1. 试图避免Flux<Flux<T>>。另一个反模式是显式订阅(innerFlux.hasElements().subscribe)。理想情况下,您只需要订阅一次,通常是在框架层(例如,WebFlux在下划线HTTP服务器中订阅)。

使用不断增加的指针(页码、偏移量等)查询数据是一种非常常见的模式,您可以使用expand操作符来实现它。在Flux的情况下,它将尝试扩展每个元素。对于分页,通常Mono<List<T>>更有用。展开将尝试展开每个页面,并在getOneResponePage返回Mono.empty()时停止。

代码语言:javascript
复制
private Flux<Integer> getAllResponses() {
    var page = new AtomicInteger(0); // initial page
    return getOneResponePage(page.get())
            .expand(list -> getOneResponePage(page.incrementAndGet()))
            .flatMapIterable(Function.identity());
}


private Mono<List<Integer>> getOneResponePage(int page) {
        System.out.println("Request for page " + page);

        // there's only content on the first 3 pages
        if (page < 3) {
            return Flux
                    .just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
                    .map(i -> (1000 * page) + i)
                    .collectList();
        }

        return Mono.empty();
    }

  1. ,如果您的流是非阻塞的,您需要在parallel调度程序上订阅。应该使用boundedElastic来“卸载”阻塞任务。使用.subscribeOn(Schedulers.parallel())。有关更多详细信息,您可以查看Difference between boundedElastic() vs parallel() scheduler
票数 1
EN

Stack Overflow用户

发布于 2022-06-16 15:36:12

没有直接的方法阻止从内部流动的外部流动。最接近的方法是在内部序列上使用switchIfEmpty和一个Flux.error(NoSuchElementException),然后在外部序列上使用onErrorResumeNext,如果发现了NoSuchElementException,则返回一个空的Flux

代码语言:javascript
复制
Flux.just(listOf(1, 2, 3), listOf(), listOf(4, 5, 6))
.flatMap(list ->
     Flux.fromIterable(list)
     .switchIfEmpty(Flux.error(new NoSuchElementException()))
)
.onErrorResumeNext(e -> 
      e instanceof NoSuchElementException ?
      Flux.empty() : Flux.error(e)
);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72647698

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档