我的问题是,我有一个迭代算法,在每次迭代时,它需要执行几个矩阵-矩阵乘法点(A_i,B_i),因为i=1.由于这些乘法是用Numpy的点来执行的,我知道他们正在调用BLAS-3实现,这是相当快的。问题是,电话的数量是巨大的,这是我的程序的一个瓶颈。我想通过生产更少的产品,但使用更大的矩阵来最小化所有这些调用的开销。
为了简单起见,考虑所有矩阵都是not (通常n不是很大,它的范围在1到1000之间)。解决我的问题的一个方法是考虑块对角线矩阵诊断(A_i),然后执行下面的产品。

这只是对函数点的一次调用,但是现在程序浪费了很多次,用零执行乘法。这个想法似乎行不通,但它给出了A_1 B_1,.,A_k B_k的结果,即所有产品都堆积在一个大矩阵中。
我的问题是,是否有方法用一个函数调用计算A_1 B_1,.,A_k B_k?或者更重要的是,我如何计算这些产品比做一个Numpy点的循环更快?
发布于 2019-12-16 12:14:10
它取决于矩阵的大小。
编辑
对于较大的nxn矩阵(aprox )。20)编译代码中的BLAS调用更快,因为较小的矩阵自定义Numba或Cython内核通常更快。
下面的方法为给定的输入形状生成自定义点函数。使用这种方法,还可以从编译器相关的优化(如循环展开)中获益,这些优化对于小矩阵特别重要。
必须指出的是,生成和编译一个内核需要大约的时间。因此,请确保只有在必须时才调用生成器。
生成器函数
def gen_dot_nm(x,y,z):
#small kernels
@nb.njit(fastmath=True,parallel=True)
def dot_numba(A,B):
"""
calculate dot product for (x,y)x(y,z)
"""
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
assert A.shape[1]==x
assert B.shape[1]==y
assert B.shape[2]==z
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
for i in range(x):
for j in range(z):
acc=0.
for k in range(y):
acc+=A[ii,i,k]*B[ii,k,j]
res[ii,i,j]=acc
return res
#large kernels
@nb.njit(fastmath=True,parallel=True)
def dot_BLAS(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
res[ii]=np.dot(A[ii],B[ii])
return res
#At square matices above size 20
#calling BLAS is faster
if x>=20 or y>=20 or z>=20:
return dot_BLAS
else:
return dot_numba使用示例
A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)
dot22=gen_dot_nm(2,2,2)
X=dot22(A,B)
%timeit X3=dot22(A,B)
#5.94 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each) 旧答案
另一种选择,但还有更多的工作要做,将是使用一些特殊的BLAS实现,它为非常小的矩阵及时创建定制内核,而不是从C调用这个内核。
示例
import numpy as np
import numba as nb
#Don't use this for larger submatrices
@nb.njit(fastmath=True,parallel=True)
def dot(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[2]==B.shape[1]
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
for i in range(A.shape[1]):
for j in range(B.shape[2]):
acc=0.
for k in range(B.shape[1]):
acc+=A[ii,i,k]*B[ii,k,j]
res[ii,i,j]=acc
return res
@nb.njit(fastmath=True,parallel=True)
def dot_22(A,B):
assert A.shape[0]==B.shape[0]
assert A.shape[1]==2
assert A.shape[2]==2
assert B.shape[1]==2
assert B.shape[2]==2
res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
for ii in nb.prange(A.shape[0]):
res[ii,0,0]=A[ii,0,0]*B[ii,0,0]+A[ii,0,1]*B[ii,1,0]
res[ii,0,1]=A[ii,0,0]*B[ii,0,1]+A[ii,0,1]*B[ii,1,1]
res[ii,1,0]=A[ii,1,0]*B[ii,0,0]+A[ii,1,1]*B[ii,1,0]
res[ii,1,1]=A[ii,1,0]*B[ii,0,1]+A[ii,1,1]*B[ii,1,1]
return res时间
A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)
X=A@B
X2=np.einsum("xik,xkj->xij",A,B)
X3=dot_22(A,B) #avoid measurig compilation overhead
X4=dot(A,B) #avoid measurig compilation overhead
%timeit X=A@B
#262 µs ± 2.55 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit np.einsum("xik,xkj->xij",A,B,optimize=True)
#264 µs ± 3.22 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit X3=dot_22(A,B)
#5.68 µs ± 27.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%timeit X4=dot(A,B)
#9.79 µs ± 61.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)发布于 2019-12-15 21:26:49
可以将数组堆叠成形状(k,n,n),并调用numpy.matmul或使用@运算符。
例如,
In [18]: A0 = np.array([[1, 2], [3, 4]])
In [19]: A1 = np.array([[1, 2], [-3, 5]])
In [20]: A2 = np.array([[4, 0], [1, 1]])
In [21]: B0 = np.array([[1, 4], [-3, 4]])
In [22]: B1 = np.array([[2, 1], [1, 1]])
In [23]: B2 = np.array([[-2, 9], [0, 1]])
In [24]: np.matmul([A0, A1, A2], [B0, B1, B2])
Out[24]:
array([[[-5, 12],
[-9, 28]],
[[ 4, 3],
[-1, 2]],
[[-8, 36],
[-2, 10]]])或者,使用@
In [32]: A = np.array([A0, A1, A2])
In [33]: A
Out[33]:
array([[[ 1, 2],
[ 3, 4]],
[[ 1, 2],
[-3, 5]],
[[ 4, 0],
[ 1, 1]]])
In [34]: B = np.array([B0, B1, B2])
In [35]: A @ B
Out[35]:
array([[[-5, 12],
[-9, 28]],
[[ 4, 3],
[-1, 2]],
[[-8, 36],
[-2, 10]]])发布于 2019-12-17 06:49:01
如果你不想浪费时间乘以零,那么你真正想要的是稀疏矩阵。使用@WarrenWeckesser中的A和B矩阵:
from scipy import sparse
sparse.block_diag((A0, A1, A2), format = "csr") @ np.concatenate((B0, B1, B2), axis = 0)
Out[]:
array([[-5, 12],
[-9, 28],
[ 4, 3],
[-1, 2],
[-8, 36],
[-2, 10]], dtype=int32)对于大型矩阵来说,这可能是一种加速。对于较小的用户,@max9111可能有正确的使用numba的想法。
https://stackoverflow.com/questions/59347796
复制相似问题