我有一个dask数据框架,由拼图支持。它有1.31亿行,当我对整个帧进行一些基本操作时,它们需要几分钟。
df = dd.read_parquet('data_*.pqt')
unique_locations = df.location.unique()
https = unique_locations.str.startswith('https:')
http = unique_locations.str.startswith('http:')
total_locations = len(unique_locations)
n_https = https.sum().compute()
n_http = http.sum().compute()时间:
CPU times: user 2min 49s, sys: 23.9 s, total: 3min 13s
Wall time: 1min 53s我天真地想,如果我这次取一个我可以取下来的数据样本,并做到了:
df = dd.read_parquet('data_*.pqt')
df = df.sample(frac=0.05)
unique_locations = df.location.unique()
https = unique_locations.str.startswith('https:')
http = unique_locations.str.startswith('http:')
total_locations = len(unique_locations)
n_https = https.sum().compute()
n_http = http.sum().compute()时间:
Unknown, I stopped it after 45minutes.我猜我的样本不能有效地访问我的所有后续计算,但我不知道如何修复它。
我感兴趣的是从dask数据帧中采样数据,然后处理该样本的最佳方法。
发布于 2018-03-10 14:46:37
我没有一个明确/简单的答案,但我确实有很多东西可以一起解决我的问题。
1)我的代码效率很低,挑选出我需要处理的特定列就能让一切正常工作。我的新代码:
import dask.dataframe as dd
from dask.distributed import Client, progress
client = Client() # Took me a little while to get the settings correct
def get_df(*columns):
files = '../cache_new/sample_*.pqt'
df = dd.read_parquet(files, columns=columns, engine='pyarrow')
return df
# All data - Takes 31s
df_all = get_df('location')
unique_locations = df_all.location.unique()
https = unique_locations.str.startswith('https:')
http = unique_locations.str.startswith('http:')
_total_locations = unique_locations.size.persist()
_n_https = https.sum().persist()
_n_http = http.sum().persist()
progress(_total_locations, _n_https, _n_http)
# 1% sample data - Takes 21s
df_sample = get_df('location').sample(frac=0.01)
unique_locations = df_sample.location.unique()
https = unique_locations.str.startswith('https:')
http = unique_locations.str.startswith('http:')
_total_locations = unique_locations.size.persist()
_n_https = https.sum().persist()
_n_http = http.sum().persist()
progress(_total_locations, _n_https, _n_http)事实证明,这并不是很大的速度提升。整个计算所用的时间主要是读取数据。如果计算非常昂贵,我想我会看到更多的速度加快。
2)我切换到在本地使用分布式调度程序,这样我就可以看到发生了什么。但这并不是没有问题:
3)我在笔记本中多次读取相同数据时发现了一个bug -- https://github.com/dask/dask/issues/3268
4)我还遇到了pandas https://github.com/pandas-dev/pandas/issues/19941#issuecomment-371960712中的内存泄漏错误
对于(3)和(4),以及在我的原始代码中我低效地读取所有列的事实,我看到了许多原因,为什么我的样本从未工作过,尽管我从未找到明确的答案。
发布于 2018-03-11 07:38:42
这里发生的情况是,通过添加样本,您正在阻止优化的发生。执行以下操作时:
df = dd.read_parquet('data_*.pqt')
df.x.sum()Dask巧妙地将其重新安排为以下内容:
df = dd.read_parquet('data_*.pqt', columns=['x'])
df.x.sum()Dask.dataframe只读入您需要的一列。这是dask.dataframe提供的为数不多的优化之一(它不做太多的高级优化)。
但是,当您将一个样本放入其中时(或任何操作)
df = dd.read_parquet('data_*.pqt', columns=['x'])
df.sample(...).x.sum()然后你得不到优化,所以一切都很慢。
所以这里的问题不是样本太慢,而是来自parquet的整个数据集都很慢,而且在read_parquet和列访问步骤之间有样本会阻止优化的发生。
始终在read_parquet中指定列
要避免这种情况,应该始终在dd.read_parquet中显式指定所需的列。
最终,如果能看到一些高级框架提供比Dask dataframe更智能的查询优化,那将是一件很好的事情。如果你想推动这一点,你可能会在Ibis上提出一个问题
https://stackoverflow.com/questions/49185950
复制相似问题