首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何确保“分区”的数量在达斯克和达斯克-cudf的工人中平均分配?

如何确保“分区”的数量在达斯克和达斯克-cudf的工人中平均分配?
EN

Stack Overflow用户
提问于 2019-10-04 18:33:02
回答 1查看 581关注 0票数 1

我正在尝试对跨工人的大型文件执行一个基本的ETL工作流,使用dask-cudf跨大量工人。

问题:

最初,scheduler计划在工人之间读取同等数量的partitions,但在预处理期间,它倾向于在工人之间分发/洗牌。

工作人员获得的分区的最小数目是4,而它得到的最大分区是19 (total partitions = apprx. 300num_workers = 22) --这种行为会在下游引发问题,因为我希望在工作人员之间分配相同的分区。

有防止这种行为的方法吗?

我想下面这会有帮助,但它没有帮助。

代码语言:javascript
复制
# limit work-stealing as much as possible
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})

正在执行的工作流程:

  • 朗读
  • 补纳
  • 下浇铸/其他逻辑
代码语言:javascript
复制
df = dask_cudf.read_csv(path = `big_files`,
                        names = names,
                        delimiter='\t',
                        dtype = read_dtype_ls,
                        chunksize=chunksize)


df = df.map_partitions(lambda df:df.fillna(-1))

def transform_col_int64_to_int32(df, columns):
    """
        This function casts int64s columns to int32s 
        we are using this to transform int64s to int32s and overflows seem to be consitent
    """
    for col in columns:
        df[col] = df[col].astype(np.int32)
    return df

df = df.map_partitions(transform_col_int64_to_int32,cat_col_names)
df = df.persist()
EN

回答 1

Stack Overflow用户

发布于 2019-10-04 18:47:39

根据多个因素执行任务的Dask计划,包括数据依赖、运行时、内存使用等。通常情况下,这些问题的答案是“让它做它自己的事”。调度不良的最常见原因是块太少。

但是,如果您显式地需要一个更重平衡的分布,那么您可以尝试使用Client.rebalance方法。

代码语言:javascript
复制
wait(df)
client.rebalance(df)

但是,请注意,再平衡并不像其他Dask操作那样健壮。最好是在没有进行大量其他工作(因此调用dask.distributed.wait )的时候这样做。

而且,我还会开始偷工作。窃取工作是负载平衡的另一个名称。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58241582

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档