首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >对于大于内存的size=(M,N) dask数组:如何从chunks=(1,N)重新分块到chunks=(M,1)?

对于大于内存的size=(M,N) dask数组:如何从chunks=(1,N)重新分块到chunks=(M,1)?
EN

Stack Overflow用户
提问于 2019-03-24 22:31:14
回答 2查看 477关注 0票数 2

为了,例如,在整个轴上应用一个用Numpy/Numba编码的IIR过滤器,我需要用m1 < m0将一个size=(M, N) dask数组从chunks=(m0, n0)重新分块到chunks=(m1, N)

由于Dask避免了重复的任务,因此在重新分块/重新分块合并期间,它将拥有价值(m0, N) (x 2?)的数据。在内存中。有没有一种优化图表的方法来避免这种行为?

我知道在哪里可以找到手动优化Dask图的信息。但是,有没有一种方法可以调整调度策略以允许重复任务,或者(自动)重新排列图形,以便在此rechunk期间最大限度地减少内存使用?

下面是一个最小的例子(对于chunks=(M, 1) chunks=(1, N)→的极端情况):

代码语言:javascript
复制
from dask import array as da
from dask.distributed import Client

# limit memory to 4 GB
client = Client(memory_limit=4e9)

# Create 80 GB random array with chunks=(M, 1)
arr = da.random.uniform(-1, 1, size=(1e5, 1e5), chunks=(1e5, 1))

# Compute mean (This works!)
arr.mean().compute()

# Rechunk to chunks=(1, N)
arr = arr.rechunk((1, 1e5))

# Compute mean (This hits memory limit!)
arr.mean().compute()
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-03-27 12:48:41

不幸的是,在最坏的情况下,您需要计算每个输入块,然后才能获得单个输出块。

Dask的重新分块操作很不错,它们会在过渡期间将数据重新分块成中等大小的块,所以这可能会在内存不足的情况下工作,但您肯定会将数据写到磁盘上。

简而言之,原则上没有什么是你应该额外做的。理论上,Dask的重新分块算法应该能处理这个问题。如果你愿意,你可以使用threshold=block_size_limit=关键字重新分块。

票数 2
EN

Stack Overflow用户

发布于 2019-03-29 16:17:12

block_size_limit=关键字提供了某种解决方案。

(下面,我使用一个较小的数组,因为我没有剩余的80 to磁盘可供溢出。)

代码语言:javascript
复制
from dask import array as da
from dask.distributed import Client

# limit memory to 1 GB
client = Client(n_workers=1, threads_per_worker=1, memory_limit=1e9)

# Create 3.2 GB array
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))

# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph")  # 2000 nodes

# Compute
print(arr.mean().compute())  # Takes 11.9 seconds. Doesn't spill.

# re-create array and rechunk with block_size_limit=1e3
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))
arr = arr.rechunk((2e1, 2e4), block_size_limit=1e3)

# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph")  # 32539 nodes

# Compute
print(arr.mean().compute())  # Takes 140 seconds, spills ~5GB to disk.

# re-create array and rechunk with default kwargs
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))
arr = arr.rechunk((2e1, 2e4))

# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph")  # 9206 nodes

# Compute
print(arr.mean().compute())  # Worker dies at 95% memory use
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55324848

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档