我在为一个需要协调不同任务的程序实现一个优雅的函数式解决方案时遇到了麻烦。这是我想要实现的目标。
我有三个类,我想编排它们的方法(为了简明起见):
class TaskA {
public ResultA call() {
return new ResultA();
}
}
class TaskB {
public ResultB call(ResultA a) {
return new ResultB();
}
}
class TaskC {
public ResultC call(List<ResultB> resultBs) {
return new ResultC();
}
}我需要并行执行TaskA 'n‘次,对于每次执行TaskA,我需要使用相应TaskA的结果执行TaskB 'n’次。最后,我需要使用所有TaskB调用的结果执行一次TaskC。
实现这一点的一种方法是创建一个Callable,它封装了对TaskA和TaskB的调用,最后在我的主线程中收集ResultB的List来执行ResultB
class TaskATaskBCallable implements Callable<ResultB> {
private TaskA taskA ...;
private TaskB taskB ...;
public ResultB call() {
return taskB.call(taskA.call());
}
}在我的主线中:
private ResultC orchestrate() {
ExecutorService service = ...;
List<Callable<ResultB>> callables = ...;
taskC.call(callables.map(callable ->
service.submit(callable)).map(Future::get).collect(Collectors.toList());
}我不喜欢这个解决方案的一点是TaskATaskBCallable。这可能是一个不必要的耦合TaskA和TaskB的类。此外,如果我必须将另一个任务链接到TaskA和TaskB,我将不得不修改TaskATaskBCallable,可能还需要修改它的名称。我觉得我可以通过更聪明地使用像CompletableFuture或Phaser这样的Java并发库类来摆脱它。
有什么建议吗?
发布于 2016-08-01 17:36:42
我找到了一种使用CompletableFuture实现此目的的方法
private ResultC orchestrate() {
ExecutorService service = ...;
int taskCount = ...;
List<CompletableFuture<ResultB>> resultBFutures = IntStream.rangeClosed(1, taskCount)
.mapToObj((i) -> CompletableFuture.supplyAsync(() -> new TaskA().call(), service))
.map(resultAFuture -> resultAFuture.thenApplyAsync(resultA -> new TaskB().call(resultA),
service))
.collect(Collectors.toList());
return new TaskC().call(CompletableFuture.allOf(resultBFutures.toArray(new CompletableFuture[resultBFutures.size()]))
.thenApply(v -> resultBFutures.stream().map(CompletableFuture::join)
.collect(Collectors.toList()))
.join());
}发布于 2016-08-02 16:40:54
我认为CompletableFuture确实会被证明是最优雅的:
int taskCount = 100;
List<ResultB> resultBs = IntStream.range(0, taskCount)
.mapToObj(i -> new TaskA())
.map(taskA -> CompletableFuture.supplyAsync(taskA::call))
.map(completableFutureA -> completableFutureA.thenApplyAsync(new TaskB()::call))
.collect(Collectors.toList()) // collect, in order to kick off the async tasks
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return new TaskC().call(resultBs);https://stackoverflow.com/questions/38687963
复制相似问题