我正在尝试对跨工人的大型文件执行一个基本的ETL工作流,使用dask-cudf跨大量工人。
问题:
最初,scheduler计划在工人之间读取同等数量的partitions,但在预处理期间,它倾向于在工人之间分发/洗牌。
工作人员获得的分区的最小数目是4,而它得到的最大分区是19 (total partitions = apprx. 300,num_workers = 22) --这种行为会在下游引发问题,因为我希望在工作人员之间分配相同的分区。
有防止这种行为的方法吗?
我想下面这会有帮助,但它没有帮助。
# limit work-stealing as much as possible
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})正在执行的工作流程:
df = dask_cudf.read_csv(path = `big_files`,
names = names,
delimiter='\t',
dtype = read_dtype_ls,
chunksize=chunksize)
df = df.map_partitions(lambda df:df.fillna(-1))
def transform_col_int64_to_int32(df, columns):
"""
This function casts int64s columns to int32s
we are using this to transform int64s to int32s and overflows seem to be consitent
"""
for col in columns:
df[col] = df[col].astype(np.int32)
return df
df = df.map_partitions(transform_col_int64_to_int32,cat_col_names)
df = df.persist()发布于 2019-10-04 18:47:39
根据多个因素执行任务的Dask计划,包括数据依赖、运行时、内存使用等。通常情况下,这些问题的答案是“让它做它自己的事”。调度不良的最常见原因是块太少。
但是,如果您显式地需要一个更重平衡的分布,那么您可以尝试使用Client.rebalance方法。
wait(df)
client.rebalance(df)但是,请注意,再平衡并不像其他Dask操作那样健壮。最好是在没有进行大量其他工作(因此调用dask.distributed.wait )的时候这样做。
而且,我还会开始偷工作。窃取工作是负载平衡的另一个名称。
https://stackoverflow.com/questions/58241582
复制相似问题