首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用FutureTask进行并发

使用FutureTask进行并发
EN

Stack Overflow用户
提问于 2015-03-02 13:02:25
回答 2查看 1.5K关注 0票数 7

我有这样的服务:

代码语言:javascript
复制
class DemoService {
    Result process(Input in) {
        filter1(in);
        if (filter2(in)) return...
        filter3(in);
        filter4(in);
        filter5(in);
        return ...

    }
}

现在我希望它更快,我发现一些过滤器可以同时启动,而一些过滤器必须等待其他过滤器完成。例如:

代码语言:javascript
复制
filter1--
         |---filter3--
filter2--             |---filter5
          ---filter4--

这意味着:

1.过滤器1和filter2可以同时启动,filter3和filter4也可以同时启动

2.过滤器3和filter4必须等待filter2完成

还有一件事

如果filter2返回true,则‘filter2’方法将立即返回,并忽略以下筛选器。

现在我的解决方案是使用FutureTask:

代码语言:javascript
复制
            // do filter's work at FutureTask
        for (Filter filter : filters) {
            FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
            executorService.execute(futureTask);
        }

        //when all FutureTask are submitted, wait for result
        for(Filter filter : filters) {
            if (filter.isReturnNeeded()) {
                FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
                riskResult = futureTask.get();
                if (canReturn(filter, riskResult)) {
                    returnOk = true;
                    return riskResult;
                }
            }
        }

我的CallableFilter:

代码语言:javascript
复制
public class CallableFilter implements Callable<RiskResult> {

    private Filter filter;
    private Context context;

    @Override
    public RiskResult call() throws Exception {
        List<Filter> dependencies = filter.getDependentFilters();
        if (dependencies != null && dependencies.size() > 0) {

            //wait for its dependency filters to finish
            for (Filter d : dependencies) {
                FutureTask<RiskResult> futureTask = context.getTask(d.getId());
                futureTask.get();

            }
        }

        //do its own work
        return filter.execute(context);
    }
}

我想知道:

1.在这种情况下使用FutureTask是个好主意吗?有没有更好的解决办法?

2.线程上下文切换的开销。

谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-03-02 18:40:00

在Java8中,您可以使用CompletableFuture来彼此链接过滤器。使用thenApply和thenCompose系列的方法,以便向CompletableFuture添加新的异步过滤器--它们将在完成前一步之后执行。thenCombine合并了两个独立的CompletableFutures,当两者都完成时。使用allOf等待两个以上CompletableFuture对象的结果。

如果您不能使用Java 8,那么番石榴ListenableFuture也可以这样做,参见可列表未来解释。使用番石榴,您可以等待多个独立运行的过滤器使用Futures.allAsList完成--这也会返回一个ListenableFuture。

这两种方法的思想都是,在声明了未来的操作、它们之间的依赖关系以及它们的线程之后,您将得到一个未来对象,它封装了您的最终结果。

编辑:早期返回可以通过使用complete()方法显式完成CompletableFuture或使用SettableFuture (实现ListenableFuture)来实现。

票数 6
EN

Stack Overflow用户

发布于 2015-03-03 10:00:26

您可以使用ForkJoinPool进行并行化,这是对这种并行计算的解释:

(...)方法join()及其变体只适用于完成依赖关系为非循环的情况下使用;也就是说,并行计算可以描述为有向无圈图(DAG) (.)

(见ForkJoinTask)

ForkJoinPool的优点是,每个任务都可以生成新任务,也可以等待它们完成,而不实际阻塞执行线程(否则,如果等待其他任务完成的任务多于可用的线程,则可能导致死锁)。

到目前为止,这是一个应该起作用的例子,尽管它还存在一些限制:

  1. 它忽略了过滤结果。
  2. 如果筛选器2返回true,它不会过早地完成执行
  3. 未实现异常处理

这段代码背后的主要思想是:每个过滤器都表示为可能依赖于其他节点的Node (=必须在此过滤器执行之前完成的筛选器)。依赖节点被派生为并行任务。

代码语言:javascript
复制
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Node<V> extends RecursiveTask<V> {
    private static final short VISITED = 1;

    private final Callable<V> callable;
    private final Set<Node<V>> dependencies = new HashSet<>();

    @SafeVarargs
    public Node(Callable<V> callable, Node<V>... dependencies) {
        this.callable = callable;
        this.dependencies.addAll(Arrays.asList(dependencies));
    }

    public Set<Node<V>> getDependencies() {
        return this.dependencies;
    }

    @Override
    protected V compute() {
        try {
            // resolve dependencies first
            for (Node<V> node : dependencies) {
                if (node.tryMarkVisited()) {
                    node.fork(); // start node
                }
            }

            // wait for ALL nodes to complete
            for (Node<V> node : dependencies) {
                node.join();
            }

            return callable.call();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }

    public boolean tryMarkVisited() {
        return compareAndSetForkJoinTaskTag((short) 0, VISITED);
    }
}

用法示例:

代码语言:javascript
复制
public static void main(String[] args) {
    Node<Void> filter1 = new Node<>(filter("filter1"));
    Node<Void> filter2 = new Node<>(filter("filter2"));
    Node<Void> filter3 = new Node<>(filter("filter3"), filter1, filter2);
    Node<Void> filter4 = new Node<>(filter("filter4"), filter1, filter2);
    Node<Void> filter5 = new Node<>(filter("filter5"), filter3, filter4);
    Node<Void> root = new Node<>(() -> null, filter5);

    ForkJoinPool.commonPool().invoke(root);
}

public static Callable<Void> filter(String name) {
    return () -> {
        System.out.println(Thread.currentThread().getName() + ": start " + name);
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + ": end   " + name);
        return null;
    };
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28810474

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档