我正在试验一些技术,以确保使用Java框架执行的并发任务所完成的副作用的可见性。作为一个简单的场景,考虑了矩阵乘法的一个假设问题。
假设要乘的矩阵相当大(例如,几千行和列),为了加速这些矩阵的乘法,我实现了一种并行算法,其中计算结果矩阵中的每个单元格被认为是一个独立的(即可并行的)任务。为了简化一点,让我们忽略一下,对于小输入矩阵,这种并行化可能不是一个好主意。
因此,下面是我的程序的第一个版本:
public class MatrixMultiplier {
private final int[][] m;
private final int[][] n;
private volatile int[][] result; //the (lazily computed) result of the matrix multiplication
private final int numberOfMRows; //number of rows in M
private final int numberOfNColumns; //number of columns in N
private final int commonMatrixDimension; //number of columns in M and rows in N
public MatrixMultiplier(int[][] m, int[][] n) {
if(m[0].length != n.length)
throw new IllegalArgumentException("Uncompatible arguments: " + Arrays.toString(m) + " and " + Arrays.toString(n));
this.m = m;
this.n = n;
this.numberOfMRows = m.length;
this.numberOfNColumns = n[0].length;
this.commonMatrixDimension = n.length;
}
public synchronized int[][] multiply() {
if (result == null) {
result = new int[numberOfMRows][numberOfNColumns];
ExecutorService executor = createExecutor();
Collection<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < numberOfMRows; i++) {
final int finalI = i;
for (int j = 0; j < numberOfNColumns; j++) {
final int finalJ = j;
tasks.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
calculateCell(finalI, finalJ);
return null;
}
});
}
}
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdownNow();
}
}
return result;
}
private ExecutorService createExecutor() {
final int availableProcessors = Runtime.getRuntime().availableProcessors();
final int processorsBound = availableProcessors + 1;
final int maxConcurrency = numberOfMRows * numberOfNColumns;
final int threadPoolSize = maxConcurrency < processorsBound ? maxConcurrency : processorsBound;
return Executors.newFixedThreadPool(threadPoolSize);
}
private void calculateCell(int mRow, int nColumn) {
int sum = 0;
for (int k = 0; k < commonMatrixDimension; k++) {
sum += m[mRow][k] * n[k][nColumn];
}
result[mRow][nColumn] = sum;
}
}据我所知,这个实现有一个问题:执行的任务对result矩阵的一些修改不一定对调用multiply()的线程可见。
假设前面的内容是正确的,那么考虑依赖显式锁的multiply()的替代实现(与锁相关的新代码用//<LRC>注释):
public synchronized int[][] multiply() {
if (result == null) {
result = new int[numberOfMRows][numberOfNColumns];
final Lock[][] locks = new Lock[numberOfMRows][numberOfNColumns]; //<LRC>
for (int i = 0; i < numberOfMRows; i++) { //<LRC>
for (int j = 0; j < numberOfNColumns; j++) { //<LRC>
locks[i][j] = new ReentrantLock(); //<LRC>
} //<LRC>
} //<LRC>
ExecutorService executor = createExecutor();
Collection<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < numberOfMRows; i++) {
final int finalI = i;
for (int j = 0; j < numberOfNColumns; j++) {
final int finalJ = j;
tasks.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
try { //<LRC>
locks[finalI][finalJ].lock(); //<LRC>
calculateCell(finalI, finalJ);
} finally { //<LRC>
locks[finalI][finalJ].unlock(); //<LRC>
} //<LRC>
return null;
}
});
}
}
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdownNow();
}
for (int i = 0; i < numberOfMRows; i++) { //<LRC>
for (int j = 0; j < numberOfNColumns; j++) { //<LRC>
locks[i][j].lock(); //<LRC>
locks[i][j].unlock(); //<LRC>
} //<LRC>
} //<LRC>
}
return result;
}使用上面的显式锁作为唯一的目标,以确保发布对调用线程的更改,因为不存在任何争用的可能性。
我的主要问题是,这是否是在我的场景中发布副作用问题的有效解决方案。
第二个问题是:是否有更有效/更优雅的方法来解决这个问题?请注意,我并不是在寻找并行化矩阵乘法的替代算法实现(例如Strassen算法),因为我的算法只是一个简单的案例研究。我非常感兴趣的替代方案,以确保可见性的变化,在一个算法,如这里介绍。
更新
我认为下面的替代实现比以前的实现有所改进。它在不影响并发性的情况下使用一个内部锁:
public class MatrixMultiplier {
...
private final Object internalLock = new Object();
public synchronized int[][] multiply() {
if (result == null) {
result = new int[numberOfMRows][numberOfNColumns];
ExecutorService executor = createExecutor();
Collection<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < numberOfMRows; i++) {
final int finalI = i;
for (int j = 0; j < numberOfNColumns; j++) {
final int finalJ = j;
tasks.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
calculateCell(finalI, finalJ);
synchronized (internalLock){}
return null;
}
});
}
}
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdownNow();
}
}
synchronized (internalLock){}
return result;
}
...
}这个选项更有效率,但是它和以前使用许多锁的实现在我看来都是正确的。我所有的观察都正确吗?在我的场景中,是否有更有效/更优雅的方法来处理同步问题?
发布于 2015-02-10 19:45:36
将result声明为volatile只会确保更改result的引用(即result = ...;操作)对每个人都是可见的。
解决这一问题最明显的办法是消除副作用。在本例中,这很简单:只需使calculateCell()和调用它的Callable返回值,并让主线程将值写入数组。
当然,您可以进行显式锁定,就像您在第二个示例中所做的那样,但是,当您只使用一个锁时,使用nxm锁似乎太过了。当然,在您的示例中,一个锁会扼杀并行性,因此解决方案再次是让calculateCell()返回值,并且只在result数组中写入结果的过程中锁定。
或者,您可以使用ForkJoin,忘记整个事情,因为它会为您做这件事。
https://stackoverflow.com/questions/28438614
复制相似问题