首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我正在使用Dask在多个数据集上使用Snorkel应用LabelingFunction,但这似乎需要很长时间。这是正常的吗?

我正在使用Dask在多个数据集上使用Snorkel应用LabelingFunction,但这似乎需要很长时间。这是正常的吗?
EN

Stack Overflow用户
提问于 2021-02-06 06:03:29
回答 1查看 70关注 0票数 1

我的问题如下:我有几个csv格式的数据集(900K,1M7和1M7条目),我将它们加载到多个Dask Dataframe中。然后,我将它们连接到一个Dask Dataframe中,我可以将它们提供给我的Snorkel应用程序,它将一组标记函数应用到我的Dataframe的每一行,并返回一个numpy数组,其中的行和列的数量与Dataframe中的行和标签函数的数量相同。

当我使用3个数据集(超过2天...)时,对Snorkel Applier的调用似乎永远都要花费很长时间。然而,如果我只使用第一个数据集运行代码,调用大约需要2个小时。当然,我不会执行连接步骤。

所以我想知道这是怎么回事?我应该更改连接的Dataframe中的分区数量吗?或者也许我一开始就把Dask用得不好?

下面是我使用的代码:

代码语言:javascript
复制
from snorkel.labeling.apply.dask import DaskLFApplier
import dask.dataframe as dd
import numpy as np
import os

start = time.time()

applier = DaskLFApplier(lfs)  # lfs are the function that are going to be applied, one of them featurize one of the column of my Dataframe and apply a sklearn classifier (I put n_jobs to None when loading the model)

# If I have only one CSV to read
if isinstance(PATH_TO_CSV, str):
    training_data = dd.read_csv(PATH_TO_CSV, lineterminator=os.linesep, na_filter=False, dtype={'size': 'int32'})
    slices = None
 
# If I have several CSV  
elif isinstance(PATH_TO_CSV, list):
    training_data_list = [dd.read_csv(path, lineterminator=os.linesep, na_filter=False, dtype={'size': 'int32'}) for path in PATH_TO_CSV]
    training_data = dd.concat(training_data_list, axis=0)

    # some useful things I do to know where to slice my final result and be sure I can assign each part to each dataset
    df_sizes = [len(df) for df in training_data_list]
    cut_idx = np.insert(np.cumsum(df_sizes), 0, 0)
    slices = list(zip(cut_idx[:-1], cut_idx[1:]))

# The call that lasts forever: I tested all the code above without that line on my 3 datasets and it runs perfectly fine
L_train = applier.apply(training_data)

end = time.time()
print('Time elapsed: {}'.format(timedelta(seconds=end-start)))

如果你需要更多的信息,我会尽我所能给你。预先感谢您的帮助:)

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-02-06 12:56:00

似乎默认情况下,applier函数使用的是进程,因此不会从可用的额外工作进程中受益:

代码语言:javascript
复制
# add this to the beginning of your code
from dask.distributed import Client
client = Client()
# you can see the address of the client by typing `client` and opening the dashboard

# skipping your other code

# you need to pass the client explicitly to the applier
# after launching this open the dashboard and watch the workers work :)
L_train = applier.apply(training_data, scheduler=client)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66071214

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档