我正在尝试高效地重构一个大型多维数据集。假设随着时间的推移,我有许多遥感图像,这些图像具有多个波段,其中坐标x,y表示像素位置,时间表示图像采集的时间,波段表示收集的不同数据。
在我的用例中,让我们假设xarray coord长度大约是x (3000),y (3000),time (10),带(40)的浮点数据。所以数据的100gb+。
我一直在尝试在this example中工作,但我在将其转换为此案例时遇到了困难。
小数据集示例
注意:实际数据比本例大得多。
import numpy as np
import dask.array as da
import xarray as xr
nrows = 100
ncols = 200
row_chunks = 50
col_chunks = 50
data = da.random.random(size=(1, nrows, ncols), chunks=(1, row_chunks, col_chunks))
def create_band(data, x, y, band_name):
return xr.DataArray(data,
dims=('band', 'y', 'x'),
coords={'band': [band_name],
'y': y,
'x': x})
def create_coords(data, left, top, celly, cellx):
nrows = data.shape[-2]
ncols = data.shape[-1]
right = left + cellx*ncols
bottom = top - celly*nrows
x = np.linspace(left, right, ncols) + cellx/2.0
y = np.linspace(top, bottom, nrows) - celly/2.0
return x, y
x, y = create_coords(data, 1000, 2000, 30, 30)
src = []
for time in ['t1', 't2', 't3']:
src_t = xr.concat([create_band(data, x, y, band) for band in ['blue', 'green', 'red', 'nir']], dim='band')\
.expand_dims(dim='time')\
.assign_coords({'time': [time]})
src.append(src_t)
src = xr.concat(src, dim='time')
print(src)
<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (time: 3, band: 4, y: 100, x: 200)>
dask.array<concatenate, shape=(3, 4, 100, 200), dtype=float64, chunksize=(1, 1, 50, 50), chunktype=numpy.ndarray>
Coordinates:
* x (x) float64 1.015e+03 1.045e+03 1.075e+03 ... 6.985e+03 7.015e+03
* band (band) object 'blue' 'green' 'red' 'nir'
* y (y) float64 1.985e+03 1.955e+03 1.924e+03 ... -984.7 -1.015e+03
* time (time) object 't1' 't2' 't3'重构-堆叠和转置
我需要存储以下内容的输出:
print(src.stack(sample=('y','x','time')).T)
<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (sample: 60000, band: 4)>
dask.array<transpose, shape=(60000, 4), dtype=float64, chunksize=(3600, 1), chunktype=numpy.ndarray>
Coordinates:
* band (band) object 'blue' 'green' 'red' 'nir'
* sample (sample) MultiIndex
- y (sample) float64 1.985e+03 1.985e+03 ... -1.015e+03 -1.015e+03
- x (sample) float64 1.015e+03 1.015e+03 ... 7.015e+03 7.015e+03
- time (sample) object 't1' 't2' 't3' 't1' 't2' ... 't3' 't1' 't2' 't3'我希望使用dask和xarray将结果以块的形式写入磁盘,以便open_mfdataset访问。parquet似乎是一个很好的选择,但是我不知道如何以块的形式编写它(src太大了,无法存储在内存中)。
@dask.delayed
def stacker(data):
return data.stack(sample=('y','x','time')).T.to_pandas()
stacker(src).to_parquet('out_*.parquet')
def stack_write(data):
data.stack(sample=('y','x','time')).T.to_pandas().to_parquet('out_*.parquet')
return None
stack_write(src)在这一点上,我只是希望有一些好的想法。谢谢!
发布于 2020-09-30 09:26:42
我这里有一个解决方案(https://github.com/pydata/xarray/issues/1077#issuecomment-644803374),用于将多索引数据集写入文件。
您必须手动将数据集“编码”为可以编写为netCDF的表单。然后在你重读的时候“解码”。
import numpy as np
import pandas as pd
import xarray as xr
def encode_multiindex(ds, idxname):
encoded = ds.reset_index(idxname)
coords = dict(zip(ds.indexes[idxname].names, ds.indexes[idxname].levels))
for coord in coords:
encoded[coord] = coords[coord].values
shape = [encoded.sizes[coord] for coord in coords]
encoded[idxname] = np.ravel_multi_index(ds.indexes[idxname].codes, shape)
encoded[idxname].attrs["compress"] = " ".join(ds.indexes[idxname].names)
return encoded
def decode_to_multiindex(encoded, idxname):
names = encoded[idxname].attrs["compress"].split(" ")
shape = [encoded.sizes[dim] for dim in names]
indices = np.unravel_index(encoded.landpoint.values, shape)
arrays = [encoded[dim].values[index] for dim, index in zip(names, indices)]
mindex = pd.MultiIndex.from_arrays(arrays)
decoded = xr.Dataset({}, {idxname: mindex})
for varname in encoded.data_vars:
if idxname in encoded[varname].dims:
decoded[varname] = (idxname, encoded[varname].values)
return decoded发布于 2020-11-16 02:20:29
这不是目前的解决方案,而是你的代码的一个版本,修改后,如果其他人想要解决这个问题,它将很容易重现:
问题出在stack操作(concatenated.stack(sample=('y','x','time'))上。在这一步,内存不断增加,这个过程就是killed。
concatenated对象是一个“Dask支持的”xarray.DataArray。因此,我们可以预期Dask会懒惰地完成stack操作。那么,为什么process killed在这一步呢?
这里发生的事情有两种可能性:
stack操作实际上是由Dask懒惰地完成的,但由于数据非常巨大,即使Dask所需的最低内存也太多stack操作不是Dask支持的import numpy as np
import dask.array as da
import xarray as xr
from numpy.random import RandomState
nrows = 20000
ncols = 20000
row_chunks = 500
col_chunks = 500
# Create a reproducible random numpy array
prng = RandomState(1234567890)
numpy_array = prng.rand(1, nrows, ncols)
data = da.from_array(numpy_array, chunks=(1, row_chunks, col_chunks))
def create_band(data, x, y, band_name):
return xr.DataArray(data,
dims=('band', 'y', 'x'),
coords={'band': [band_name],
'y': y,
'x': x})
def create_coords(data, left, top, celly, cellx):
nrows = data.shape[-2]
ncols = data.shape[-1]
right = left + cellx*ncols
bottom = top - celly*nrows
x = np.linspace(left, right, ncols) + cellx/2.0
y = np.linspace(top, bottom, nrows) - celly/2.0
return x, y
x, y = create_coords(data, 1000, 2000, 30, 30)
bands = ['blue', 'green', 'red', 'nir']
times = ['t1', 't2', 't3']
bands_list = [create_band(data, x, y, band) for band in bands]
src = []
for time in times:
src_t = xr.concat(bands_list, dim='band')\
.expand_dims(dim='time')\
.assign_coords({'time': [time]})
src.append(src_t)
concatenated = xr.concat(src, dim='time')
print(concatenated)
# computed = concatenated.compute() # "computed" is ~35.8GB
stacked = concatenated.stack(sample=('y','x','time'))
transposed = stacked.T您可以尝试更改nrows和ncols的值,以改变concatenated的大小。为了提高性能,我们也可以/应该改变chunks。
注意:我甚至尝试过这个
concatenated.to_netcdf("concatenated.nc")
concatenated = xr.open_dataarray("concatenated.nc", chunks=10)这是为了确保它是Dask支持的DataArray,并能够调整块。我为chunks尝试了不同的值:但总是内存不足。
https://stackoverflow.com/questions/63906769
复制相似问题