我有这样的服务:
class DemoService {
Result process(Input in) {
filter1(in);
if (filter2(in)) return...
filter3(in);
filter4(in);
filter5(in);
return ...
}
}现在我希望它更快,我发现一些过滤器可以同时启动,而一些过滤器必须等待其他过滤器完成。例如:
filter1--
|---filter3--
filter2-- |---filter5
---filter4--这意味着:
1.过滤器1和filter2可以同时启动,filter3和filter4也可以同时启动
2.过滤器3和filter4必须等待filter2完成
还有一件事
如果filter2返回true,则‘filter2’方法将立即返回,并忽略以下筛选器。
现在我的解决方案是使用FutureTask:
// do filter's work at FutureTask
for (Filter filter : filters) {
FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
executorService.execute(futureTask);
}
//when all FutureTask are submitted, wait for result
for(Filter filter : filters) {
if (filter.isReturnNeeded()) {
FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
riskResult = futureTask.get();
if (canReturn(filter, riskResult)) {
returnOk = true;
return riskResult;
}
}
}我的CallableFilter:
public class CallableFilter implements Callable<RiskResult> {
private Filter filter;
private Context context;
@Override
public RiskResult call() throws Exception {
List<Filter> dependencies = filter.getDependentFilters();
if (dependencies != null && dependencies.size() > 0) {
//wait for its dependency filters to finish
for (Filter d : dependencies) {
FutureTask<RiskResult> futureTask = context.getTask(d.getId());
futureTask.get();
}
}
//do its own work
return filter.execute(context);
}
}我想知道:
1.在这种情况下使用FutureTask是个好主意吗?有没有更好的解决办法?
2.线程上下文切换的开销。
谢谢!
发布于 2015-03-02 18:40:00
在Java8中,您可以使用CompletableFuture来彼此链接过滤器。使用thenApply和thenCompose系列的方法,以便向CompletableFuture添加新的异步过滤器--它们将在完成前一步之后执行。thenCombine合并了两个独立的CompletableFutures,当两者都完成时。使用allOf等待两个以上CompletableFuture对象的结果。
如果您不能使用Java 8,那么番石榴ListenableFuture也可以这样做,参见可列表未来解释。使用番石榴,您可以等待多个独立运行的过滤器使用Futures.allAsList完成--这也会返回一个ListenableFuture。
这两种方法的思想都是,在声明了未来的操作、它们之间的依赖关系以及它们的线程之后,您将得到一个未来对象,它封装了您的最终结果。
编辑:早期返回可以通过使用complete()方法显式完成CompletableFuture或使用SettableFuture (实现ListenableFuture)来实现。
发布于 2015-03-03 10:00:26
您可以使用ForkJoinPool进行并行化,这是对这种并行计算的解释:
(...)方法join()及其变体只适用于完成依赖关系为非循环的情况下使用;也就是说,并行计算可以描述为有向无圈图(DAG) (.)
(见ForkJoinTask)
ForkJoinPool的优点是,每个任务都可以生成新任务,也可以等待它们完成,而不实际阻塞执行线程(否则,如果等待其他任务完成的任务多于可用的线程,则可能导致死锁)。
到目前为止,这是一个应该起作用的例子,尽管它还存在一些限制:
true,它不会过早地完成执行这段代码背后的主要思想是:每个过滤器都表示为可能依赖于其他节点的Node (=必须在此过滤器执行之前完成的筛选器)。依赖节点被派生为并行任务。
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class Node<V> extends RecursiveTask<V> {
private static final short VISITED = 1;
private final Callable<V> callable;
private final Set<Node<V>> dependencies = new HashSet<>();
@SafeVarargs
public Node(Callable<V> callable, Node<V>... dependencies) {
this.callable = callable;
this.dependencies.addAll(Arrays.asList(dependencies));
}
public Set<Node<V>> getDependencies() {
return this.dependencies;
}
@Override
protected V compute() {
try {
// resolve dependencies first
for (Node<V> node : dependencies) {
if (node.tryMarkVisited()) {
node.fork(); // start node
}
}
// wait for ALL nodes to complete
for (Node<V> node : dependencies) {
node.join();
}
return callable.call();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
public boolean tryMarkVisited() {
return compareAndSetForkJoinTaskTag((short) 0, VISITED);
}
}用法示例:
public static void main(String[] args) {
Node<Void> filter1 = new Node<>(filter("filter1"));
Node<Void> filter2 = new Node<>(filter("filter2"));
Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2);
Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2);
Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4);
Node<Void> root = new Node<>(() -> null, filter5);
ForkJoinPool.commonPool().invoke(root);
}
public static Callable<Void> filter(String name) {
return () -> {
System.out.println(Thread.currentThread().getName() + ": start " + name);
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + ": end " + name);
return null;
};
}https://stackoverflow.com/questions/28810474
复制相似问题