为了,例如,在整个轴上应用一个用Numpy/Numba编码的IIR过滤器,我需要用m1 < m0将一个size=(M, N) dask数组从chunks=(m0, n0)重新分块到chunks=(m1, N)。
由于Dask避免了重复的任务,因此在重新分块/重新分块合并期间,它将拥有价值(m0, N) (x 2?)的数据。在内存中。有没有一种优化图表的方法来避免这种行为?
我知道在哪里可以找到手动优化Dask图的信息。但是,有没有一种方法可以调整调度策略以允许重复任务,或者(自动)重新排列图形,以便在此rechunk期间最大限度地减少内存使用?
下面是一个最小的例子(对于chunks=(M, 1) chunks=(1, N)→的极端情况):
from dask import array as da
from dask.distributed import Client
# limit memory to 4 GB
client = Client(memory_limit=4e9)
# Create 80 GB random array with chunks=(M, 1)
arr = da.random.uniform(-1, 1, size=(1e5, 1e5), chunks=(1e5, 1))
# Compute mean (This works!)
arr.mean().compute()
# Rechunk to chunks=(1, N)
arr = arr.rechunk((1, 1e5))
# Compute mean (This hits memory limit!)
arr.mean().compute()发布于 2019-03-27 12:48:41
不幸的是,在最坏的情况下,您需要计算每个输入块,然后才能获得单个输出块。
Dask的重新分块操作很不错,它们会在过渡期间将数据重新分块成中等大小的块,所以这可能会在内存不足的情况下工作,但您肯定会将数据写到磁盘上。
简而言之,原则上没有什么是你应该额外做的。理论上,Dask的重新分块算法应该能处理这个问题。如果你愿意,你可以使用threshold=和block_size_limit=关键字重新分块。
发布于 2019-03-29 16:17:12
block_size_limit=关键字提供了某种解决方案。
(下面,我使用一个较小的数组,因为我没有剩余的80 to磁盘可供溢出。)
from dask import array as da
from dask.distributed import Client
# limit memory to 1 GB
client = Client(n_workers=1, threads_per_worker=1, memory_limit=1e9)
# Create 3.2 GB array
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))
# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph") # 2000 nodes
# Compute
print(arr.mean().compute()) # Takes 11.9 seconds. Doesn't spill.
# re-create array and rechunk with block_size_limit=1e3
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))
arr = arr.rechunk((2e1, 2e4), block_size_limit=1e3)
# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph") # 32539 nodes
# Compute
print(arr.mean().compute()) # Takes 140 seconds, spills ~5GB to disk.
# re-create array and rechunk with default kwargs
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))
arr = arr.rechunk((2e1, 2e4))
# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph") # 9206 nodes
# Compute
print(arr.mean().compute()) # Worker dies at 95% memory usehttps://stackoverflow.com/questions/55324848
复制相似问题