In python2,我想通过填充并行进程(或线程)不同的子数组(总共有16个块)来填充全局数组。我必须精确地说明每个块不依赖于其他块,我的意思是当我处理当前块的每个单元格时。
1)根据我的发现,通过使用不同的"processes“,我将从CPU多核中获得很大的好处,但其他所有进程共享全局数组似乎有点复杂。
2)从另一个角度来看,我可以使用"threads“而不是"processes”,因为实现的难度较小。我发现来自"ThreadPool“的libray "multiprocessing.dummy”允许所有其他并发线程共享这个全局数组。
例如,在python2.7中,以下代码工作:
from multiprocessing.dummy import Pool as ThreadPool
## discretization along x-axis and y-axis for each block
arrayCross_k = np.linspace(kMIN, kMAX, dimPoints)
arrayCross_mu = np.linspace(-1, 1, dimPoints)
# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))
dimBlocks = 4
# Size of dimension along k and mu axis
dimPoints = 100
# dimension along one dimension of global arrayFullCross
dimMatCovCross = dimBlocks*dimPoints
# Build cross-correlation matrix
def buildCrossMatrix_loop(params_array):
# rows indices
xb = params_array[0]
# columns indices
yb = params_array[1]
# Current redshift
z = zrange[params_array[2]]
# Loop inside block
for ub in range(dimPoints):
for vb in range(dimPoints):
# Diagonal blocs
if (xb == yb):
# Fill the (xb,yb) su-block of global array by
arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])),
...
...
# End of function buildCrossMatrix_loop
# Main loop
while i < len(zrange):
def generatorCrossMatrix(index):
for igen in range(dimBlocks):
for lgen in range(dimBlocks):
yield igen, lgen, index
if __name__ == '__main__':
# Use 20 threads
pool = ThreadPool(20)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
# Increment index "i"
i = i+1但不幸的是,即使使用了20个线程,我也意识到我的CPU内核还没有完全运行(实际上,使用“top”或“htop”命令,我只看到一个100%的进程)。
3)如果我想充分利用我的CPU的16个核心,我必须选择什么策略(就像pool.map(function, generator)) but with also the sharing of global array的情况一样)?
4)有些人告诉我为每个子数组做I/O (基本上,在一个文件中写入每个块,通过读取这些子数组收集所有子数组,然后填充完整的数组)。这个解决方案很方便,但我想避免I/O (除非真的没有其他解决方案)。
5)我已经用MPI library和C language一起练习了填充子数组的操作,最后把它们集合起来构建一个大数组,并不是很复杂。但是,我不想在Python中使用MPI (我不知道它是否存在)。
6)我还尝试在上面的Process主循环中使用目标等于填充函数(buildCrossMatrix_loop)的while:
from multiprocessing import Process
# Main loop on z range
while i < len(zrange):
params_p = []
for ip in range(4):
for jp in range(4):
params_p.append(ip)
params_p.append(jp)
params_p.append(i)
p = Process(target=buildCrossMatrix_loop, args=(params_p,))
params_p = []
p.start()
# Finished : wait everybody
p.join()
...
...
i = i+1
# End of main while loop但最终的2D全局数组只填充零。因此,我必须推断,Process函数不共享要填充的数组?
( 7)我要寻找哪种策略?
1.使用“池进程”并找到一种方法来共享全局数组,知道我的所有16核都将运行。
2.使用“线程”和共享全局数组,但乍一看,性能似乎不如“池进程”那么好。也许有一种方法可以增加每个“线程”的功能,我的意思是使用“池进程”??。
我试着在https://docs.python.org/2/library/multiprocessing.html上学习不同的例子,但没有成功,也就是说,从加速的角度看,没有相关的性能。
我认为在我的例子中,arrayFullCross主要问题是收集所有子数组,或者全局数组不被其他进程或线程共享。。
如果有人在多线程上下文中有一个共享全局变量的简单例子(这里是一个数组),那么最好把它放在这里。
更新1:我用Threading (而不是multiprocessing)进行了测试,但是性能仍然很差。GIL显然没有被解锁,即在htop命令中只出现一个进程(可能线程库的版本不是正确的)。
因此,我将尝试使用“返回”方法来处理我的问题。
天真地,我试图在应用map函数的函数末尾返回整个数组,如下所示:
# Build cross-correlation matrix
def buildCrossMatrix_loop(params_array):
# rows indices
xb = params_array[0]
# columns indices
yb = params_array[1]
# Current redshift
z = zrange[params_array[2]]
# Loop inside block
for ub in range(dimPoints):
for vb in range(dimPoints):
# Diagonal blocs
if (xb == yb):
arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb])
...
... #others assignments on arrayFullCross elements
# Return global array to main process
return arrayFullCross然后,我试图像这样从map接收这个全局数组:
if __name__ == '__main__':
pool = Pool(16)
outputArray = pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.terminate()
## Print outputArray
print 'outputArray = ', outputArray
## Reshape 4D outputArray to 2D array
arrayFullCross2D_swap = np.array(outputArray).swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)不幸的是,当我打印outputArray时,我得到:
outputArray = [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]这不是所期望的4D outputArray,只是一个16无的列表(我认为16个对应于generatorCrossMatrix(i)提供的进程数量)。
一旦map启动并完成,我如何才能拿回整个4D数组?
发布于 2019-02-14 21:42:05
首先,我相信multiprocessing.ThreadPool是一个私有的API,所以您应该避免它。现在,multiprocessing.dummy是一个无用的模块。它做的是,而不是,做任何多线程/处理,这就是为什么您看不到任何好处。您应该使用“普通”multiprocessing模块。
第二个代码不工作,因为它使用多个进程。进程不共享内存,因此您在子进程中所做的更改不会反映在其他子进程或主进程中。你要么想:
multiprocessing.Pool.mapthreading而不是multiprocessing: just replace导入多进程with导入with,代码应该可以工作。请注意,threading版本只会工作,因为numpy在计算期间释放GIL,否则它将停留在1个CPU上。
您可能想看看几分钟前的this similar question,哪个I answered。
https://stackoverflow.com/questions/54697850
复制相似问题