首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >反应性扩展和/或参与者模型如何简化流程编排?

反应性扩展和/或参与者模型如何简化流程编排?
EN

Stack Overflow用户
提问于 2015-01-02 18:33:21
回答 1查看 597关注 0票数 3

我正在实现一个模块,在该模块中,我的主进程生成一组并行和连续的子进程(任务),以完成其工作。任务本身主要是从各种源获取数据并执行计算。有些是CPU,而另一些则是IO绑定的。

当前的实现在多个步骤中使用Java Executor/Completion服务来实现这一点。该流程工作流的一个示例可以描述如下:

Task0 -> TaskA2

更近的、更近的、更高的、更高的A3、->、任务、B1、->的任务C(将所有任务的结果结合起来产生输出)

B2 /T1589.2-1988商品价格、商品、商品等商品

任务A1-A4并行运行,任务B1B2也并行运行。最后,任务C依赖于AB的所有任务来编译最终的输出。

使用Executor服务构建这个程序似乎不太干净,我一直在寻找更好的方法来实现这一点,因为这些任务依赖可能会随着时间的推移而改变或增加复杂性,让FuturesCallable来管理它们会随着时间的推移变得更加丑陋。

我一直在探索这个主题,并遇到了、反应性扩展、参与者模型框架。Akka似乎对此有点过份了,而高级别的RxJava似乎是一种合理的适合,因为它的流/基于事件的处理模式将简化设计并使其更易于扩展。

RxJava线程示例中的一些例子看起来也很有希望。

我来这里是为了寻求社区的一些建议,比如这是否是正确的方法,以及是否有其他方法/更好的框架来解决这些问题。

=====================================================================================================

使用JGraphT编写了以下代码,但仍然需要弄清楚如何重用线程池。在本例中,我最终为每个请求创建了新的线程执行程序。在这里张贴代码的主要部分,这应该给出一个方法的想法。

代码语言:javascript
复制
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.jgrapht.DirectedGraph;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.traverse.TopologicalOrderIterator;

public class GraphTaskExecutor {

    ThreadExecutor executor;
    private List<Result> results;
    private List<TaskInfo> log;
    private DirectedGraph<GraphTask, DefaultEdge> graph;
    Set<GraphTask> executing;

    public GraphTaskExecutor() {
        executor = new ThreadExecutor(Runtime.getRuntime()
                .availableProcessors() * 4, 60,
                new LinkedBlockingQueue<Runnable>());
        results = new ArrayList<Result>();
        log = new ArrayList<TaskInfo>();
        executing = new HashSet<GraphTask>();
    }

    public List<Result> execute(Request request, List<GraphTask> tasks) {
        System.out.println("Preparing task runner. Num Tasks: " + tasks.size());
        graph = new GraphTaskBuilder(tasks).buildGraph();
        processTasks();
        awaitCompletion();
        return results;
    }

    private void awaitCompletion() {
        try {
            executor.awaitTermination(3, TimeUnit.DAYS);
            System.out.println("Results " + results.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void processTasks() {
        if (graph.vertexSet().size() == 0) {
            executor.shutdown();
            System.out
                    .println("All tasks completed... shutting down executor service");
        } else {
            synchronized (graph) {
                Iterator<GraphTask> iter = new TopologicalOrderIterator<GraphTask, DefaultEdge>(
                        graph);
                while (iter.hasNext()) {
                    GraphTask task = iter.next();
                    if (graph.incomingEdgesOf(task).size() == 0
                            && !executing.contains(task)) {
                        executor.execute(task);
                        executing.add(task);
                    }
                }
            }
        }
    }

    private void completed(GraphTask t) {
        System.out.println("Completed Task: " + t.getName());
        synchronized (graph) {
            for (DefaultEdge edge : graph.outgoingEdgesOf(t)) {
                GraphTask target = graph.getEdgeTarget(edge);
                target.addData(t.getData());
            }
            if (t.isEndPoint())
                results.add(t.getResult());
            graph.removeVertex(t);
            executing.remove(t);
        }
        processTasks();
    }

    private class ThreadExecutor extends ThreadPoolExecutor {
        public ThreadExecutor(int corePoolSize, long keepAliveSeconds,
                BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, corePoolSize, keepAliveSeconds,
                    TimeUnit.SECONDS, workQueue);
        }

        @Override
        protected void beforeExecute(Thread thread, Runnable runTask) {
            super.beforeExecute(thread, runTask);
        }

        @Override
        protected void afterExecute(Runnable runTask, Throwable e) {
            super.afterExecute(runTask, e);
            completed((GraphTask) runTask);
        }

    }

    public static void main(String arg[]) throws Exception {
        GraphTaskExecutor graphTaskExecutor = new GraphTaskExecutor();
        TaskContext context = new TaskContext();
        List<GraphTask> tasks = new ArrayList<GraphTask>();
        Request request = new Request(1);

        Set<DataType> empty = new HashSet<DataType>();
        Set<DataType> producer = new HashSet<DataType>(Arrays.asList(
                DataType.ACCT_INFO, DataType.PROJECTIONS));
        Set<DataType> consumer = new HashSet<DataType>(Arrays.asList(
                DataType.ACCT_INFO, DataType.PROJECTIONS));
        Set<DataType> accountResult = new HashSet<DataType>(
                Arrays.asList(DataType.ACCT_INFO));
        Set<DataType> projectionResult = new HashSet<DataType>(
                Arrays.asList(DataType.PROJECTIONS));
        Set<DataType> intraDayResult = new HashSet<DataType>(
                Arrays.asList(DataType.PROJECTIONS));

        tasks.add(new GraphTask(context, "1", "A", producer, empty, empty));
        tasks.add(new GraphTask(context, "2", "X", producer, consumer, empty,
                "A"));
        tasks.add(new GraphTask(context, "3", "Y", producer, consumer,
                accountResult, "A"));
        tasks.add(new GraphTask(context, "4", "B", producer, consumer, empty,
                "A"));
        tasks.add(new GraphTask(context, "5", "C", producer, consumer, empty,
                "B"));
        tasks.add(new GraphTask(context, "6", "D", producer, consumer,
                intraDayResult, "C"));
        tasks.add(new GraphTask(context, "7", "E", producer, consumer,
                projectionResult, "D", "X", "Y"));

        graphTaskExecutor.execute(request, tasks);
        System.out.println("All DONE");

    }

}


import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;

public class GraphTask extends AbstractTask {

    private Set<String> dependencies = new TreeSet<String>();

    public GraphTask(TaskContext context, String id, String name,
            Set<DataType> produces, Set<DataType> consumes,
            Set<DataType> endpoints, String... dependency) {
        super(id, name, context, produces, consumes, endpoints);
        dependencies.addAll(Arrays.asList(dependency));
    }

    public GraphTask(TaskContext context, String id, String name,
            Set<DataType> produces, Set<DataType> consumes,
            Set<DataType> endpoints) {
        super(id, name, context, produces, consumes, endpoints);
    }

    public void addDependency(String dependency) {
        this.dependencies.add(dependency);
    }

    public Data process(TaskContext context, Data data) throws TaskException {
        int time = (int) (Math.random() * 10);
        System.out.println("Task " + getName() + " estimated to run for "
                + time + " secs");
        TaskResult result = null;
        try {
            Thread.sleep(time * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        result = new TaskResult(getName());
        for (DataType d: getProduces()) {
            result.addData(d, d.toString());
        }
        return result;
    }

    public Set<String> getDependencies() {
        return dependencies;
    }

    public void setDependencies(Set<String> dependencies) {
        this.dependencies = dependencies;
    }

}

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public abstract class AbstractTask implements Task<Data, Result> {

    private String identifier;
    private String name;
    private TaskContext context;
    private List<Data> prevData;
    private Data data;
    private Set<DataType> produces;
    private Set<DataType> consumes;
    private Set<DataType> endpoints;
    private TaskStatus status;
    private LocalTime startTime;
    private LocalTime endTime;
    private Result result;

    public AbstractTask(String id, String name, TaskContext context,
            Set<DataType> produces, Set<DataType> consumes,
            Set<DataType> endpoints) {
        this.identifier = id;
        this.name = name;
        this.context = context;
        this.consumes = consumes;
        this.produces = produces;
        this.endpoints = endpoints;
        this.data = new Data();
        this.prevData = new ArrayList<Data>();
        this.status = TaskStatus.SUCCESS;
    }

    public AbstractTask(String id, String name, TaskContext context) {
        this(id, name, context, new HashSet<DataType>(),
                new HashSet<DataType>(), new HashSet<DataType>());
    }

    public String getIdentifier() {
        return identifier;
    }

    public void setIdentifier(String identifier) {
        this.identifier = identifier;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public TaskContext getContext() {
        return context;
    }

    public void setContext(TaskContext context) {
        this.context = context;
    }

    public boolean isEndPoint() {
        return (endpoints.size() > 0);
    }

    private Data preProcess() throws MissingDataException,
            SkippedTaskException, ErrorTaskException {
        Map<TaskStatus, Data> statusData = new HashMap<TaskStatus, Data>();
        Map<DataType, Object> allData = new HashMap<DataType, Object>();

        // Get all data from previous results
        for (Data r : prevData) {
            statusData.put(r.getStatus(), r);
            allData.putAll(r.getObjects());
        }

        Data data = new Data();
        data.addData(allData);
        data.setStatus(deriveTaskStatus(statusData));

        switch (data.getStatus()) {
        case SUCCESS:
            for (DataType d : consumes) {
                if (!allData.containsKey(d)) {
                    throw new MissingDataException("Task " + name + " Missing input data for "
                            + d);
                }
            }
            break;
        case SKIPPED:
            throw new SkippedTaskException("Previous Task was skipped");
        case ERROR:
            throw new ErrorTaskException("Previous Task failed");
        }
        return data;
    }

    private TaskStatus deriveTaskStatus(Map<TaskStatus, Data> statusData) {
        if (statusData.containsKey(TaskStatus.ERROR))
            return TaskStatus.ERROR;
        if (statusData.containsKey(TaskStatus.SKIPPED))
            return TaskStatus.SKIPPED;
        return TaskStatus.SUCCESS;
    }

    private Result postProcess(Data outputData) throws MissingDataException {
        Result result = new Result();
        for (DataType d : endpoints) {
            if (!outputData.getObjects().containsKey(d)) {
                throw new MissingDataException("Missing end point data for " + d);
            }
            result.addData(d, outputData.getObject(d));
        }
        return result;
    }

    @Override
    public void run() {
        System.out.println("Running task: " + name);
        try {
            Data inputData = preProcess();
            data = process(context, inputData);
            result = postProcess(data);
        } catch (MissingDataException | SkippedTaskException
                | ErrorTaskException e) {
            data = new Data(TaskStatus.SKIPPED, new Error("SKIP_TASK",
                    "Skip Task", e));
            e.printStackTrace();
        } catch (TaskException e) {
            data = new Data(TaskStatus.ERROR, new Error("PREV_ERROR",
                    "Error in dependent task", e));
            e.printStackTrace();
        }
    }

    public abstract Data process(TaskContext context, Data data)
            throws TaskException;

    @Override
    public void addData(Data data) {
        this.prevData.add(data);
    }

    public TaskStatus getStatus() {
        return status;
    }

    public void setStatus(TaskStatus status) {
        this.status = status;
    }

    public Result getResult() {
        return result;
    }

    public Set<DataType> getProduces() {
        return produces;
    }

    public void setProduces(Set<DataType> produces) {
        this.produces = produces;
    }

    public Set<DataType> getConsumes() {
        return consumes;
    }

    public void setConsumes(Set<DataType> consumes) {
        this.consumes = consumes;
    }

    public Data getData() {
        return data;
    }

    @Override
    public String toString() {
        return "[" + name + "]";
    }

    @Override
    public TaskInfo getTaskInfo() {
        return new TaskInfo(this.identifier, this.name, this.status,
                this.startTime, this.endTime);
    }

    public Set<DataType> getEndpoints() {
        return endpoints;
    }

    public void setEndpoints(Set<DataType> endpoints) {
        this.endpoints = endpoints;
    }

}
EN

回答 1

Stack Overflow用户

发布于 2015-01-02 23:18:20

在这里,Rx将是一个很好的选择,因为它的目的是在应用程序中构建复杂的消息流、叉和连接并行执行(只是另一个编译依赖项)。虽然与CPU绑定的任务可能会以简单的回调结束,但IO绑定需要异步IO支持(您也可以使用rx自己完成)。

Akka和Vert.x是构建应用程序的完整框架,而Rx只是一个库,可以将异步函数编程的优点引入应用程序。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27746842

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档