我目前有数据简单的数据处理,涉及groupby、merge和并行列到列操作。不那么简单的部分是大量使用的行(其详细的成本/财务数据)。它的大小是300-400 gb。
由于内存有限,目前我使用的是dask的核心外计算。但是,它真的很慢。
我以前读过使用CuDF来提高map_partitions和groupby的性能,但是大多数例子是使用中高端gpu (至少1050 to,大多数运行在基于gv的云vm上),而且数据可以在gpu上使用。
我的机器规范是E5-2620v3(6C/12T)、128 2gb和K620 (只有2GB专用的vram)。
使用的中间数据存储在地板上。
如果我使用使用的低端GPU,它会使它更快吗?在GPU中是否有可能在核心计算之外完成?(例如,im环顾四周,但尚未找到)
下面是我试图做的事情的简化伪代码
a.csv是大小为300 in的数据,由3列(Hier1、Hier2、Hier3、value)组成,Hier1-3为层次结构,以字符串表示。value是销售值b.csv是大小为50 in的数据,由3列(Hier1、Hier2、valuetype、cost)组成。Hier1 1-2是层次结构,以字符串表示。值类型是成本类型,以字符串为单位。成本是成本价值
基本上,我需要根据a.csv的销售值按比例计算b.csv中的每一项成本。我的想法是,在Hier3级别(这是更详细的级别),每个成本都可用。
第一步是建立比例比例:
import dask.dataframe as dd
# read raw data, repartition, convert to parquet for both file
raw_reff = dd.read_csv('data/a.csv')
raw_reff = raw_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
raw_reff = raw_reff.set_index('PartGroup')
raw_reff.to_parquet("data/raw_a.parquet")
cost_reff = dd.read_csv('data/b.csv')
cost_reff = cost_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
cost_reff = cost_reff.set_index('PartGroup')
cost_reff.to_parquet("data/raw_b.parquet")
# create reference ratio
ratio_reff = dd.read_parquet("data/raw_a.parquet").reset_index()
#to push down ram usage, instead of dask groupby im using groupby on each partition. Should be ok since its already partitioned above on each group
ratio_reff = ratio_reff.map_partitions(lambda df: df.groupby(['PartGroup'])['value'].sum().reset_index())
ratio_reff = ratio_reff.set_index('PartGroup')
ratio_reff = ratio_reff.map_partitions(lambda df: df.rename(columns={'value':'value_on_group'}))
ratio_reff.to_parquet("data/reff_a.parquet")然后进行合并以得到比率
raw_data = dd.read_parquet("data/raw_a.parquet").reset_index()
reff_data = dd.read_parquet("data/reff_a.parquet").reset_index()
ratio_data = raw_data.merge(reff_data, on=['PartGroup'], how='left')
ratio_data['RATIO'] = ratio_data['value'].fillna(0)/ratio_data['value_on_group'].fillna(0)
ratio_data = ratio_data[['PartGroup','Hier3','RATIO']]
ratio_data = ratio_data.set_index('PartGroup')
ratio_data.to_parquet("data/ratio_a.parquet")然后将PartGroup上的成本数据合并并乘以比率,使其按比例计算值。
reff_stg = dd.read_parquet("data/ratio_a.parquet").reset_index()
cost_stg = dd.read_parquet("data/raw_b.parquet").reset_index()
final_stg = reff_stg.merge(cost_stg, on=['PartGroup'], how='left')
final_stg['allocated_cost'] = final_stg['RATIO']*final_stg['cost']
final_stg = final_stg.set_index('PartGroup')
final_stg.to_parquet("data/result_pass1.parquet")在实际情况下,由于缺少引用数据等原因,将产生剩余价值,并使用多个引用在几次传递中完成,但基本上是上述步骤。
即使是严格的地板操作,它仍然要从我的128 to内存中取出~80 to内存,我所有的核心运行100%,3-4天运行。我正在寻找方法,使这与目前的硬件做得更快。如您所见,它的大规模参数化问题符合gpu处理的定义。
谢谢
发布于 2020-02-20 21:43:33
@Ditto,不幸的是,这不能用您当前的硬件完成。你的K620有开普勒架构的GPU,低于急流的最低要求。你需要一个帕斯卡卡或更好的运行急流。好消息是,如果购买与RAPIDS兼容的视频卡不是一个可行的选择,有许多廉价的云供应选项。老实说,你要做的事,我想要一个额外的GPU处理速度,并建议使用多GPU设置。
对于比GPU更大的数据集,可以使用dask_cudf来处理数据集。在我们的文档和笔记本中有几个例子。请注意,在dask.compute()之后产生的数据集需要能够安装在GPU中。
https://rapidsai.github.io/projects/cudf/en/0.12.0/10min.html#10-Minutes-to-cuDF-and-Dask-cuDF
https://rapidsai.github.io/projects/cudf/en/0.12.0/dask-cudf.html#multi-gpu-with-dask-cudf
一旦您可以获得一个工作,快速兼容,多GPU的设置和使用dask_cudf,您应该得到一个非常值得的同时,加快,特别是对于这种规模的数据探索。
希望这能有所帮助!
https://stackoverflow.com/questions/59675392
复制相似问题