玩具示例
我有两个数组,它们的形状不同,例如:
import numpy as np
matrix = np.arange(5*6*7*8).reshape(5, 6, 7, 8)
vector = np.arange(1, 20, 2)我要做的是把矩阵的每一个元素乘以‘向量’的一个元素,然后在最后两个轴上做和。为此,我有一个与“矩阵”形状相同的数组,它告诉我要使用哪个索引,例如:
Idx = (matrix+np.random.randint(0, vector.size, size=matrix.shape))%vector.size我知道解决办法之一是:
matVec = vector[Idx]
res = np.sum(matrix*matVec, axis=(2, 3))甚至:
res = np.einsum('ijkl, ijkl -> ij', matrix, matVec)问题
但是,我的问题是,我的数组很大,matVec的构造需要时间和内存。那么,是否有办法绕过这一问题,并取得同样的结果呢?
更真实的示例
下面是一个更现实的例子,说明我实际上在做什么:
import numpy as np
order = 20
dim = 23
listOrder = np.arange(-order, order+1, 1)
N, P = np.meshgrid(listOrder, listOrder)
K = np.arange(-2*dim+1, 2*dim+1, 1)
X = np.arange(-2*dim, 2*dim, 1)
tN = np.einsum('..., p, x -> ...px', N, np.ones(K.shape, dtype=int), np.ones(X.shape, dtype=int))#, optimize=pathInt)
tP = np.einsum('..., p, x -> ...px', P, np.ones(K.shape, dtype=int), np.ones(X.shape, dtype=int))#, optimize=pathInt)
tK = np.einsum('..., p, x -> ...px', np.ones(P.shape, dtype=int), K, np.ones(X.shape, dtype=int))#, optimize=pathInt)
tX = np.einsum('..., p, x -> ...px', np.ones(P.shape, dtype=int), np.ones(K.shape, dtype=int), X)#, optimize=pathInt)
tL = tK + tX
mini, maxi = -4*dim+1, 4*dim-1
NmPp2L = np.arange(2*mini-2*order, 2*maxi+2*order+1)
Idx = (2*tL+tN-tP) - NmPp2L[0]
np.random.seed(0)
matrix = (np.random.rand(Idx.size) + 1j*np.random.rand(Idx.size)).reshape(Idx.shape)
vector = np.random.rand(np.max(Idx)+1) + 1j*np.random.rand(np.max(Idx)+1)
res = np.sum(matrix*vector[Idx], axis=(2, 3))发布于 2022-09-13 13:16:15
对于较大的数据数组
import numpy as np
matrix = np.arange(50*60*70*80).reshape(50, 60, 70, 80)
vector = np.arange(1, 50, 2)
Idx = (matrix+np.random.randint(0, vector.size, size=matrix.shape))%vector.size并行numba加快了计算速度,避免了创建matVec。
英特尔四核Xeon铂8259 On的研制
matVec = vector[Idx]
# %timeit 48.4 ms ± 167 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
res = np.einsum('ijkl, ijkl -> ij', matrix, matVec)
# %timeit 26.9 ms ± 40.7 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)针对未优化的numba实现
import numba as nb
@nb.njit(parallel=True)
def func(matrix, idx, vector):
res = np.zeros((matrix.shape[0],matrix.shape[1]), dtype=matrix.dtype)
for i in nb.prange(matrix.shape[0]):
for j in range(matrix.shape[1]):
for k in range(matrix.shape[2]):
for l in range(matrix.shape[3]):
res[i,j] += matrix[i,j,k,l] * vector[idx[i,j,k,l]]
return res
func(matrix, Idx, vector) # compile run
func(matrix, Idx, vector)
# %timeit 21.7 ms ± 486 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
# (48.4 + 26.9) / 21.7 = ~3.47x speed up
np.testing.assert_equal(func(matrix, Idx, vector), np.einsum('ijkl, ijkl -> ij', matrix, matVec))性能和进一步优化
当处理复数时,上面的Numba代码应该是内存绑定的。事实上,matrix和Idx都很大,必须从相对缓慢的内存中完全读取.对于目标平台,matrix的大小为41*41*92*92*8*2 = 217.10 MiB和Idx,大小为41*41*92*92*8 = 108.55 MiB或41*41*92*92*4 = 54.28 MiB (在大多数x86-64 Windows平台上应该是int32类型,在大多数Linux86-64平台上应该是int64 )。这也是为什么vector[Idx]速度慢的原因: Numpy需要在内存中写入一个大数组(更不用说写数据比在x86-64平台上读取数据慢两倍了)。
假设代码是内存绑定的,加快速度的唯一方法是减少从RAM读取的数据量。这可以通过将Idx存储在uint16数组中来实现,而不是默认的np.int_数据类型(大2~4)。这是可能的,因为vector.size很小。在一个具有i5-9600KF处理器和38-40 GiB/s的Linux上,这种优化提高了大约29%的速度,而代码仍然主要是内存绑定的。在这个平台上,实现几乎是最优的。
https://stackoverflow.com/questions/73702569
复制相似问题