首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >并行处理数据

并行处理数据
EN

Stack Overflow用户
提问于 2020-10-14 22:33:45
回答 1查看 1.2K关注 0票数 4

我有一个进程,需要处理每一行的数据,然后在每一行中追加一个新的值。这是一个很大的数据文件,一次需要几个小时来处理一个数据。

如果我有一个迭代循环,它将每一行发送到一个函数,我是否可以将我的处理过程中的加速比平分?行的结果不相关。

基本上我的代码是这样的

代码语言:javascript
复制
for index, row in df.iterrows():
   row['data'] = function[row]

有什么简单的方法可以加快处理速度吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-10-14 22:47:15

虽然迭代行不是很好的实践,而且可以有与grouby/transform聚合等不同的逻辑,但是如果在最坏的情况下,您确实需要这样做,那么请按照答案来做。此外,你可能不需要在这里重新实现所有的东西,你可以使用像达斯克这样的库,它是建立在熊猫之上的。

但是,为了给出一个想法,您可以将multiprocessing (Pool.map)与chunking结合使用。读取块中的csv (或按答案末尾提到的做块),并将其映射到池中,在处理每个块时添加新行(或将它们添加到列表中并创建新块)并从函数中返回。

最后,在执行所有池时合并数据格式。

代码语言:javascript
复制
import pandas as pd
import numpy as np
import multiprocessing


def process_chunk(df_chunk):
        
        for index, row in df_chunk.reset_index(drop = True).iterrows():
                    #your logic for updating this chunk or making new chunk here
                         
                    print(row)
                    
                    print("index is " + str(index))
        #if you can added to same df_chunk, return it, else if you appended
        #rows to have list_of_rows, make a new df with them and return
        #pd.Dataframe(list_of_rows)  

        return df_chunk   


if __name__ == '__main__':
            #use all available cores , otherwise specify the number you want as an argument,
            #for example if you have 12 cores,  leave 1 or 2 for other things
            pool = multiprocessing.Pool(processes=10) 
            
            results = pool.map(process_chunk, [c for c in pd.read_csv("your_csv.csv", chunksize=7150)])
            pool.close()
            pool.join()
            
            #make new df by concatenating
            
            concatdf = pd.concat(results, axis=0, ignore_index=True)

注意事项:与阅读csv不同,您可以按照相同的逻辑传递卡盘,要计算块大小,您可能需要类似于round_of( (length of df) / (number of core available-2)) (例如每块100000/14 = round(7142.85) = 7150 rows )的内容。

代码语言:javascript
复制
 results = pool.map(process_chunk,
        [df[c:c+chunk_size] for c in range(0,len(df),chunk_size])
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64362395

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档