首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >确保Java执行框架中并发任务的副作用的可见性

确保Java执行框架中并发任务的副作用的可见性
EN

Stack Overflow用户
提问于 2015-02-10 18:01:34
回答 1查看 106关注 0票数 0

我正在试验一些技术,以确保使用Java框架执行的并发任务所完成的副作用的可见性。作为一个简单的场景,考虑了矩阵乘法的一个假设问题。

假设要乘的矩阵相当大(例如,几千行和列),为了加速这些矩阵的乘法,我实现了一种并行算法,其中计算结果矩阵中的每个单元格被认为是一个独立的(即可并行的)任务。为了简化一点,让我们忽略一下,对于小输入矩阵,这种并行化可能不是一个好主意。

因此,下面是我的程序的第一个版本:

代码语言:javascript
复制
public class MatrixMultiplier {

    private final int[][] m;
    private final int[][] n;
    private volatile int[][] result; //the (lazily computed) result of the matrix multiplication

    private final int numberOfMRows; //number of rows in M
    private final int numberOfNColumns; //number of columns in N
    private final int commonMatrixDimension; //number of columns in M and rows in N

    public MatrixMultiplier(int[][] m, int[][] n) {
        if(m[0].length != n.length)
            throw new IllegalArgumentException("Uncompatible arguments: " + Arrays.toString(m) + " and " + Arrays.toString(n));
        this.m = m;
        this.n = n;
        this.numberOfMRows = m.length;
        this.numberOfNColumns = n[0].length;
        this.commonMatrixDimension = n.length;
    }

    public synchronized int[][] multiply() {
        if (result == null) {
            result = new int[numberOfMRows][numberOfNColumns];

            ExecutorService executor = createExecutor();

            Collection<Callable<Void>> tasks = new ArrayList<>();
            for (int i = 0; i < numberOfMRows; i++) {
                final int finalI = i;
                for (int j = 0; j < numberOfNColumns; j++) {
                    final int finalJ = j;
                    tasks.add(new Callable<Void>() {
                        @Override
                        public Void call() throws Exception {
                            calculateCell(finalI, finalJ);
                            return null;
                        }
                    });
                }
            }

            try {
                executor.invokeAll(tasks);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                executor.shutdownNow();
            }

        }

        return result;
    }

    private ExecutorService createExecutor() {
        final int availableProcessors = Runtime.getRuntime().availableProcessors();
        final int processorsBound = availableProcessors + 1;
        final int maxConcurrency = numberOfMRows * numberOfNColumns;
        final int threadPoolSize = maxConcurrency < processorsBound ? maxConcurrency : processorsBound;
        return Executors.newFixedThreadPool(threadPoolSize);
    }

    private void calculateCell(int mRow, int nColumn) {
        int sum = 0;
        for (int k = 0; k < commonMatrixDimension; k++) {
            sum += m[mRow][k] * n[k][nColumn];
        }
        result[mRow][nColumn] = sum;
    }

}

据我所知,这个实现有一个问题:执行的任务对result矩阵的一些修改不一定对调用multiply()的线程可见。

假设前面的内容是正确的,那么考虑依赖显式锁的multiply()的替代实现(与锁相关的新代码用//<LRC>注释):

代码语言:javascript
复制
    public synchronized int[][] multiply() {
        if (result == null) {
            result = new int[numberOfMRows][numberOfNColumns];

            final Lock[][] locks = new Lock[numberOfMRows][numberOfNColumns]; //<LRC>
            for (int i = 0; i < numberOfMRows; i++) { //<LRC>
                for (int j = 0; j < numberOfNColumns; j++) { //<LRC>
                    locks[i][j] = new ReentrantLock(); //<LRC>
                } //<LRC>
            } //<LRC>

            ExecutorService executor = createExecutor();

            Collection<Callable<Void>> tasks = new ArrayList<>();
            for (int i = 0; i < numberOfMRows; i++) {
                final int finalI = i;
                for (int j = 0; j < numberOfNColumns; j++) {
                    final int finalJ = j;
                    tasks.add(new Callable<Void>() {
                        @Override
                        public Void call() throws Exception {
                            try { //<LRC>
                                locks[finalI][finalJ].lock(); //<LRC>
                                calculateCell(finalI, finalJ);
                            } finally { //<LRC>
                                locks[finalI][finalJ].unlock(); //<LRC>
                            } //<LRC>
                            return null;
                        }
                    });
                }
            }

            try {
                executor.invokeAll(tasks);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                executor.shutdownNow();
            }

            for (int i = 0; i < numberOfMRows; i++) { //<LRC>
                for (int j = 0; j < numberOfNColumns; j++) { //<LRC>
                    locks[i][j].lock(); //<LRC>
                    locks[i][j].unlock(); //<LRC>
                } //<LRC>
            } //<LRC>
        }

        return result;
    }

使用上面的显式锁作为唯一的目标,以确保发布对调用线程的更改,因为不存在任何争用的可能性。

我的主要问题是,这是否是在我的场景中发布副作用问题的有效解决方案。

第二个问题是:是否有更有效/更优雅的方法来解决这个问题?请注意,我并不是在寻找并行化矩阵乘法的替代算法实现(例如Strassen算法),因为我的算法只是一个简单的案例研究。我非常感兴趣的替代方案,以确保可见性的变化,在一个算法,如这里介绍。

更新

我认为下面的替代实现比以前的实现有所改进。它在不影响并发性的情况下使用一个内部锁:

代码语言:javascript
复制
public class MatrixMultiplier {
    ...
    private final Object internalLock = new Object();

    public synchronized int[][] multiply() {
        if (result == null) {
            result = new int[numberOfMRows][numberOfNColumns];

            ExecutorService executor = createExecutor();

            Collection<Callable<Void>> tasks = new ArrayList<>();
            for (int i = 0; i < numberOfMRows; i++) {
                final int finalI = i;
                for (int j = 0; j < numberOfNColumns; j++) {
                    final int finalJ = j;
                    tasks.add(new Callable<Void>() {
                        @Override
                        public Void call() throws Exception {
                            calculateCell(finalI, finalJ);
                            synchronized (internalLock){}
                            return null;
                        }
                    });
                }
            }

            try {
                executor.invokeAll(tasks);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                executor.shutdownNow();
            }

        }

        synchronized (internalLock){}

        return result;
    }
    ...
}

这个选项更有效率,但是它和以前使用许多锁的实现在我看来都是正确的。我所有的观察都正确吗?在我的场景中,是否有更有效/更优雅的方法来处理同步问题?

EN

回答 1

Stack Overflow用户

发布于 2015-02-10 19:45:36

result声明为volatile只会确保更改result的引用(即result = ...;操作)对每个人都是可见的。

解决这一问题最明显的办法是消除副作用。在本例中,这很简单:只需使calculateCell()和调用它的Callable返回值,并让主线程将值写入数组。

当然,您可以进行显式锁定,就像您在第二个示例中所做的那样,但是,当您只使用一个锁时,使用nxm锁似乎太过了。当然,在您的示例中,一个锁会扼杀并行性,因此解决方案再次是让calculateCell()返回值,并且只在result数组中写入结果的过程中锁定。

或者,您可以使用ForkJoin,忘记整个事情,因为它会为您做这件事。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28438614

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档