我有一个进程,需要处理每一行的数据,然后在每一行中追加一个新的值。这是一个很大的数据文件,一次需要几个小时来处理一个数据。
如果我有一个迭代循环,它将每一行发送到一个函数,我是否可以将我的处理过程中的加速比平分?行的结果不相关。
基本上我的代码是这样的
for index, row in df.iterrows():
row['data'] = function[row]有什么简单的方法可以加快处理速度吗?
发布于 2020-10-14 22:47:15
虽然迭代行不是很好的实践,而且可以有与grouby/transform聚合等不同的逻辑,但是如果在最坏的情况下,您确实需要这样做,那么请按照答案来做。此外,你可能不需要在这里重新实现所有的东西,你可以使用像达斯克这样的库,它是建立在熊猫之上的。
但是,为了给出一个想法,您可以将multiprocessing (Pool.map)与chunking结合使用。读取块中的csv (或按答案末尾提到的做块),并将其映射到池中,在处理每个块时添加新行(或将它们添加到列表中并创建新块)并从函数中返回。
最后,在执行所有池时合并数据格式。
import pandas as pd
import numpy as np
import multiprocessing
def process_chunk(df_chunk):
for index, row in df_chunk.reset_index(drop = True).iterrows():
#your logic for updating this chunk or making new chunk here
print(row)
print("index is " + str(index))
#if you can added to same df_chunk, return it, else if you appended
#rows to have list_of_rows, make a new df with them and return
#pd.Dataframe(list_of_rows)
return df_chunk
if __name__ == '__main__':
#use all available cores , otherwise specify the number you want as an argument,
#for example if you have 12 cores, leave 1 or 2 for other things
pool = multiprocessing.Pool(processes=10)
results = pool.map(process_chunk, [c for c in pd.read_csv("your_csv.csv", chunksize=7150)])
pool.close()
pool.join()
#make new df by concatenating
concatdf = pd.concat(results, axis=0, ignore_index=True)注意事项:与阅读csv不同,您可以按照相同的逻辑传递卡盘,要计算块大小,您可能需要类似于round_of( (length of df) / (number of core available-2)) (例如每块100000/14 = round(7142.85) = 7150 rows )的内容。
results = pool.map(process_chunk,
[df[c:c+chunk_size] for c in range(0,len(df),chunk_size])https://stackoverflow.com/questions/64362395
复制相似问题