首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pandas-Dataframe并行应用(Swifter,TQDM::process_map)冻结?当被调用时

Pandas-Dataframe并行应用(Swifter,TQDM::process_map)冻结?当被调用时
EN

Stack Overflow用户
提问于 2021-05-04 03:04:15
回答 1查看 162关注 0票数 0

我有一个到音频文件的大约15k路径的数据帧,我想用它来执行一个操作(人为添加噪声)。一般来说,整个过程都是有效的,但即使记录较少,也需要很长时间。问题不在于函数的执行时间,而在于所有函数初始化之前的时间。

代码语言:javascript
复制
start = time.time()
data_augmented = data_augmented.swifter.progress_bar(True, desc="Merge Sounds") \
        .apply(merge_sounds(**settings), axis=1)
print(f"{time.time-(start)} - Map Timer")
代码语言:javascript
复制
Merge Sounds: 100%|█████████████████████████████| 16/16 [00:07<00:00,  2.09it/s]
26.973325729370117 - Map Timer

正如您在这里看到的,初始化所需的时间几乎是Lambda函数(merge_sounds)运行时的4倍。其中initialization-time im指的是elapsed_time_measured_by_myself - elapsed_time_measured_by_tqdm,因此在本例中为26.97.. - 7 = 19.97

代码语言:javascript
复制
    start = time.time()
    lambda_fn = merge_sounds(**settings) #doesnt work if i put it in the line below.
    data_augmented = process_map(lambda_fn, data_augmented, max_workers=threads,
                                     desc=f"Merge_sounds [{threads} Threads]")
    print(f"{time.time-(start)} - Map Timer")

被卡住了:

代码语言:javascript
复制
Merge_sounds [16 Threads]:   0%|                         | 0/16 [00:00<?, ?it/s]

代码语言:javascript
复制
    with Pool(processes=16) as pool:
        data_augmented = pool.map(merge_sounds(**settings), tqdm(data_augmented, desc=f"Merge Sounds: {16} Threads"))

被卡住了:

代码语言:javascript
复制
Merge Sounds: 16 Threads:  38%|██████          | 6/16 [00:00<00:00, 4697.75it/s]

我知道并行化对于较小的数据集是没有意义的,我只是想知道为什么我可以很容易地将代码中的所有东西都并行化,而我就是不能在这里前进。我后来在很多数据上运行了这段代码,所以如果并行可以工作,我会非常高兴。

Map中使用的函数是:

代码语言:javascript
复制
def merge_sounds(**settings):
    _range = settings.get("snr_range", (0.15, 0.65))
    assert len(_range), "snr_range -> e.g. (0.15, 0.75)"
    target_sample_rate = settings.get("target_sample_rate", "16000")

    if "target_path" not in settings.keys():
        raise Exception("please Specify target_path in Settings-Dict")
    target_path = Path(settings["target_path"])
    target_path.mkdir(parents=True, exist_ok=True)

    def __call__(item):
        _target_path = item["path_augmented"]

        snr = round(uniform(_range[0], _range[1]), 4)
        pad_idx = item.name

        yp, _ = IO.load(item["path"], sample_rate=target_sample_rate)
        yn, _ = IO.load(item["path_noise"], sample_rate=target_sample_rate)
        item["snr"] = snr

        y_augmented = Effect.add_noise(yp, yn, snr=snr, pad_idx=pad_idx)
        IO.save_wav(y_augmented, _target_path, target_sample_rate)
        return item

    return __call__

有没有什么东西可以并行化Map函数(似乎在我的代码中的任何地方都可以使用这个变体,就像预期的那样)

Ty处于高级状态。

EN

回答 1

Stack Overflow用户

发布于 2021-05-05 20:39:03

如果通过绕过TQDM修复此问题。我拉上了柱子的拉链,我需要这样的东西

代码语言:javascript
复制
    _paths_in = _df["path_input"]
    _paths_out = _df["path_output"]
    _path_noise = _df["path_noise"]
    job = zip(_paths_in, _path_noise, _paths_out, _filter_jobs)

然后我把它传递给了MultiProcessor函数。

代码语言:javascript
复制
    jobs = list(enumerate((zip_jobs(df))))

    with Pool(processes=_threads) as pool:
        data_augmented = pool.map(execute_job, tqdm(jobs, desc=f"Audio-Augmentation: {_threads} Threads"))

merge_sounds和新的execute_job是相似的,只是改变了,函数需要什么参数。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67374454

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档