我有一个按顺序处理大约60个文件的代码。为了提高性能,我想尽可能多地并行计算。
这是我的密码:
def calculate:
for file in sorted(entries.glob("*.TXT")):
file = str(file)
if file.endswith('Blk.TXT') or file.endswith('BLK_.TXT'):
outlier.outlier_Cu_blk(file)
elif file.endswith('.TXT') and 's' in file:
outlier.outlier_Cu_std(file)
else:
outlier.outlier_Cu_sample(file)这个函数调用了一些熊猫和矮胖的操作。
如何在这里插入多处理?
编辑: Roland的建议至今仍然有效,但是,“排序”不再有效,因为有些文件比其他文件更快。但这是至关重要的。由于我使用了几个函数,"if = main“不起作用/我不知道如何使它工作
这里有更多的细节:
这是我的完整代码:
def calculate(elements):
elif elements == available_elements[1]: ## this is just a dummy for further calculations later on
entries = Path(inf)
for file in entries.glob("*.exp"):
print(file)
outlier.outlier_Fe(file)
print(calculate_Fe)
elif elements == available_elements[2]: ## this is the main/only working function right now
print('Processing the ' + available_elements[2] + ' Isotope System')
time.sleep(1)
print('Start with outlier correction')
entries = Path(inf)
for file in sorted(entries.glob("*.TXT")):
file = str(file)
if file.endswith('Blk.TXT') or file.endswith('BLK_.TXT'):
outlier.outlier_Cu_blk(file)
elif file.endswith('.TXT') and 's' in file:
outlier.outlier_Cu_std(file)
else:
outlier.outlier_Cu_sample(file)
print('Outlier correction finished')
"""Insert Header"""
outname = os.path.join(outfile_results, 'Cu_header.csv')
fullname = os.path.join(outfile_results, 'Cu_export.csv')
reread = pd.read_csv(fullname, sep='\t', names = ['Time', 'Filename', '60Ni', '61Ni', '62Ni', '63Cu', '64Ni', '65Cu', '66Zn'], index_col=False)
reread.to_csv(fullname, sep = '\t', header=True, index=False)
print('Calculate d65/63Cu with SSB Method')
resultname = os.path.join(outfile_results, 'Cu_delta.csv')
ssb.ssb()
print('Processing of ' + available_elements[2] + ' Isotope System finished')
print('File "'+ outname +'" contains the raw isotope values')
print('File "'+ resultname +'" contains the d65/63Cu values')例如,这发生在"outlier.py“中。
def outlier_Cu_sample(file, append=True):
data = pd.read_csv(file, sep='\t', names=['Time', '60Ni', '61Ni', '62Ni', '63Cu', '64Ni', '65Cu', '66Zn'], skiprows=6, nrows=80, index_col=False, dtype=float)
cols = list(data.drop(columns='Time').columns)
datao = pd.DataFrame({'Time':data['Time']})
datao[cols] = data[cols].where(np.abs(stats.zscore(data[cols])) < 2)
datao.to_csv(outfile_corrected_raw_data + basename + '.csv', sep='\t', header = True, index_label='Index_name') ## this creates single files in a folder again in the right order because of the filename
#This below here inserts the mean value of all files in a single csv file. If I do it in sequence, this is because of "sorted" in the right order. With multiprocessing it is in the wrong order. #
mean_filtered_transposed = pd.DataFrame(data=np.mean(datao)).T
mean_filtered_transposed['Time'] = pd.to_datetime(mean_filtered_transposed["Time"], unit='s')
clean = mean_filtered_transposed.drop(mean_filtered_transposed.columns[[0]], axis=1)
clean.insert(0, 'Inputfile', file)
if append:
clean.to_csv(fullname, sep='\t', mode="a", header=False, index_label='Index_name')
else:
clean.to_csv(fullname, sep='\t', mode="w", header=True, index_label='Index_name')如何通过多处理将其按正确的顺序处理?我想把"mean_filtered“的步骤放在另一个函数中,这个函数是按顺序运行的。但是如何使用多处理调用主函数呢?在未来,我需要能够选择“元素”。
编辑2:这是/modules中的多处理函数:
import sys
sys.path.append('')
import multiprocessing as mp
import modules.config as conf
infile = (conf.WorkspaceVariableInput)
def Cu(file):
global infile
global outfile
inf = infile
global outfile_results
global outfile_corrected_raw_data
global outfile_plt
file = str(file)
if file.endswith('Blk.TXT') or file.endswith('BLK_.TXT'):
outlier.outlier_Cu_blk(file)
elif file.endswith('.TXT') and 's' in file:
outlier.outlier_Cu_std(file)
else:
outlier.outlier_Cu_sample(file)
return f"finished processing {file}"
entries = Path(infile)
if __name__ == '__main__':
data = sorted(entries.glob("*.TXT"))
with mp.Pool() as p:
for res in p.imap_unordered(Cu, data):
print(res)它由这个函数从文件夹/调用
import modules.config as conf
import modules.Cu as Cu
def element(elements):
global infile
global outfile
# global available_elements
# global calc
inf = infile
global outfile_results #= outfile + '/results'
global outfile_corrected_raw_data
global outfile_plt
global export
if elements == available_elements[2]:
print('Processing the ' + available_elements[2] + ' Isotope System')
time.sleep(1)
print('Start with outlier correction')
Cu.Cu()
print('Outlier correction finished')
"""Insert Header"""
Cu.Cu()
chooseelement = element(str('Cu'))发布于 2022-04-01 07:04:38
从for循环中删除calculate,但将sorted(entries.glob("*.TXT"))放在列表中。将calculate更改为接受单个参数,即文件名。
然后创建一个multiprocessing.Pool。使用Pool.imap_unordered和列表作为参数运行calculate方法。
使用imap_unordered的主要原因是它返回的结果是它们完成的顺序,而map返回的是它们提交的顺序。因此,imap_unordered的完成速度往往更快。如果保持提交的顺序很重要,请使用map。
import multiprocessing as mp
def calculate(file):
if file.endswith('Blk.TXT') or file.endswith('BLK_.TXT'):
outlier.outlier_Cu_blk(file)
elif file.endswith('.TXT') and 's' in file:
outlier.outlier_Cu_std(file)
else:
outlier.outlier_Cu_sample(file)
return f"finished processing {file}"
if __name__ == '__main__':
data = sorted(entries.glob("*.TXT"))
with mp.Pool() as p:
for res in p.imap_unordered(calculate, data):
print(res)编辑:
您应该只在主脚本中使用multiprocessing对象,即__name__ == '__main__'位于True的位置。原因在文档中的multiprocessing编程指南中找到。
在ms-windows和macOS平台上,mulitprocessing为每个multiprocessing.Process或池工作人员生成了一个新的multiprocessing.Process解释器。(这是因为macOS没有在POSIX系统中找到fork系统调用,而且一些macOS框架在fork中并不总是很好地工作。)这个start方法需要,原始代码可以由新启动的Python解释器导入,而不会产生副作用,比如尝试启动一个新进程。如果不进行检查,如果不满足这一要求,将导致一连串的Python进程。
https://stackoverflow.com/questions/71702910
复制相似问题