首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何通过使用ExecutorService和CompletableFuture控制线程的数量,在spring flux中立即向客户端返回响应?

如何通过使用ExecutorService和CompletableFuture控制线程的数量,在spring flux中立即向客户端返回响应?
EN

Stack Overflow用户
提问于 2020-01-14 02:32:50
回答 1查看 1.4K关注 0票数 0

我需要使用基于Spring flux的my rest服务API中的非阻塞io并行调用两个下游系统。但是第一下行系统容量是一次10个请求,并且第二下行系统容量是100。

第一个下游系统out被输入到第二个下游系统,因此我可以向第二个系统发出更并行的请求,以加快过程。

第二个下游系统响应非常大,因此无法在内存中具体保存所有响应,因此立即希望将响应返回给客户端。

Ex工作流:

示例代码:

代码语言:javascript
复制
@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getstream() {

    ExecutorService executor = Executors.newFixedThreadPool(10);

    List<CompletableFuture> list = new ArrayList<>();

    AtomicInteger ai = new AtomicInteger(1);
    RestTemplate restTemplate = new RestTemplate();

    for (int i = 0; i < 100; i++) {
        CompletableFuture<Object> cff = CompletableFuture.supplyAsync(

                () -> ai.getAndAdd(1) + " first downstream web service " +
                        restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get(), String.class)

        ).thenApplyAsync(v -> {

            Random r = new Random();
            Integer in = r.nextInt(1000);

            return v + " second downstream web service  " + in + " " + restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get() + 1, String.class) + " \n";
        }, executor);

        list.add(cff);
    }

    return Flux.fromStream(list.stream().map(m -> {
                try {
                    return m.get().toString();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
                return "";
            })
    );

}

这段代码只适用于前五个线程,在我得到响应后,所有线程都完成了这个过程。但是,一旦我从第二个下游系统获得响应,我就需要立即获得对客户端的响应。

注意:上面的代码不是用二级线程池实现的。

提前谢谢你。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-01-14 04:04:38

如果您正在使用Spring-Webflux构建非阻塞系统,最好在示例中利用WebClient的功能。我已经创建了一个简单的测试应用程序,其中以下代码片段适用于我:

代码语言:javascript
复制
private final WebClient w = WebClient.create("http://localhost:8080/call"); // web client for external system


@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyClass> getstream() {
    return Flux
            .range(0, 100) // prepare initial 100 requests
            .window(10) // combine elements in batch of 10 (probably buffer will fit better, have a look)

            // .delayElements(Duration.ofSeconds(5)) for testing purpose you can use this function as well
            .doOnNext(flow -> log.info("Batch of 10 is ready")) // double check tells that batch is ready

            .flatMap(flow -> flow
                    // perform an external async call for each element in batch of 10
                    // they will be executed sequentially but there will not be any performance issues because
                    // calls are async. If you wish you can add .parallel() to the flow to make it parallel
                    .flatMap(element -> w.get().exchange())
                    .map(r -> r.bodyToMono(MyClass.class))
            )

            // subscribe to each response and throw received element further to the stream
            .flatMap(response -> Mono.create(s -> response.subscribe(s::success)))

            .window(1000) // batch of 1000 is ready
            .flatMap(flow -> flow
                    .flatMap(element -> w.get().exchange())
                    .map(r -> r.bodyToMono(MyClass.class))
            )
            .flatMap(response -> Mono.create(s -> response.subscribe(s::success)));
}

public static class MyClass {
    public Integer i;
}

更新:

我准备了一个小申请来复制你的案例。您可以在my repository中找到它。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59722242

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档