我需要使用基于Spring flux的my rest服务API中的非阻塞io并行调用两个下游系统。但是第一下行系统容量是一次10个请求,并且第二下行系统容量是100。
第一个下游系统out被输入到第二个下游系统,因此我可以向第二个系统发出更并行的请求,以加快过程。
第二个下游系统响应非常大,因此无法在内存中具体保存所有响应,因此立即希望将响应返回给客户端。
Ex工作流:

示例代码:
@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getstream() {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture> list = new ArrayList<>();
AtomicInteger ai = new AtomicInteger(1);
RestTemplate restTemplate = new RestTemplate();
for (int i = 0; i < 100; i++) {
CompletableFuture<Object> cff = CompletableFuture.supplyAsync(
() -> ai.getAndAdd(1) + " first downstream web service " +
restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get(), String.class)
).thenApplyAsync(v -> {
Random r = new Random();
Integer in = r.nextInt(1000);
return v + " second downstream web service " + in + " " + restTemplate.getForObject("http://dummy.restapiexample.com/api/v1/employee/" + ai.get() + 1, String.class) + " \n";
}, executor);
list.add(cff);
}
return Flux.fromStream(list.stream().map(m -> {
try {
return m.get().toString();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "";
})
);
}这段代码只适用于前五个线程,在我得到响应后,所有线程都完成了这个过程。但是,一旦我从第二个下游系统获得响应,我就需要立即获得对客户端的响应。
注意:上面的代码不是用二级线程池实现的。
提前谢谢你。
发布于 2020-01-14 04:04:38
如果您正在使用Spring-Webflux构建非阻塞系统,最好在示例中利用WebClient的功能。我已经创建了一个简单的测试应用程序,其中以下代码片段适用于我:
private final WebClient w = WebClient.create("http://localhost:8080/call"); // web client for external system
@GetMapping(path = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<MyClass> getstream() {
return Flux
.range(0, 100) // prepare initial 100 requests
.window(10) // combine elements in batch of 10 (probably buffer will fit better, have a look)
// .delayElements(Duration.ofSeconds(5)) for testing purpose you can use this function as well
.doOnNext(flow -> log.info("Batch of 10 is ready")) // double check tells that batch is ready
.flatMap(flow -> flow
// perform an external async call for each element in batch of 10
// they will be executed sequentially but there will not be any performance issues because
// calls are async. If you wish you can add .parallel() to the flow to make it parallel
.flatMap(element -> w.get().exchange())
.map(r -> r.bodyToMono(MyClass.class))
)
// subscribe to each response and throw received element further to the stream
.flatMap(response -> Mono.create(s -> response.subscribe(s::success)))
.window(1000) // batch of 1000 is ready
.flatMap(flow -> flow
.flatMap(element -> w.get().exchange())
.map(r -> r.bodyToMono(MyClass.class))
)
.flatMap(response -> Mono.create(s -> response.subscribe(s::success)));
}
public static class MyClass {
public Integer i;
}更新:
我准备了一个小申请来复制你的案例。您可以在my repository中找到它。
https://stackoverflow.com/questions/59722242
复制相似问题