首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多进程多文件

多进程多文件
EN

Stack Overflow用户
提问于 2022-04-01 06:53:49
回答 1查看 62关注 0票数 0

我有一个按顺序处理大约60个文件的代码。为了提高性能,我想尽可能多地并行计算。

这是我的密码:

代码语言:javascript
复制
def calculate:
      for file in sorted(entries.glob("*.TXT")):
            file = str(file)
            if file.endswith('Blk.TXT') or file.endswith('BLK_.TXT'):
                outlier.outlier_Cu_blk(file)
            elif file.endswith('.TXT') and 's' in file:
                outlier.outlier_Cu_std(file)
            else:
                outlier.outlier_Cu_sample(file)

这个函数调用了一些熊猫和矮胖的操作。

如何在这里插入多处理?

编辑: Roland的建议至今仍然有效,但是,“排序”不再有效,因为有些文件比其他文件更快。但这是至关重要的。由于我使用了几个函数,"if = main“不起作用/我不知道如何使它工作

这里有更多的细节:

这是我的完整代码:

代码语言:javascript
复制
def calculate(elements):       

    elif elements == available_elements[1]: ## this is just a dummy for further calculations later on

        entries = Path(inf)
        for file in entries.glob("*.exp"):
            print(file)
            outlier.outlier_Fe(file)
            print(calculate_Fe)

    elif elements == available_elements[2]: ## this is the main/only working function right now
        print('Processing the ' + available_elements[2] + ' Isotope System')
        time.sleep(1)
        print('Start with outlier correction')
        entries = Path(inf)
        for file in sorted(entries.glob("*.TXT")): 
            file = str(file)
            if file.endswith('Blk.TXT') or file.endswith('BLK_.TXT'):
                outlier.outlier_Cu_blk(file)
            elif file.endswith('.TXT') and 's' in file:
                outlier.outlier_Cu_std(file)
            else:
                outlier.outlier_Cu_sample(file)
        print('Outlier correction finished')
        """Insert Header"""        
        outname = os.path.join(outfile_results, 'Cu_header.csv')
        fullname = os.path.join(outfile_results, 'Cu_export.csv')
        reread = pd.read_csv(fullname, sep='\t', names = ['Time', 'Filename', '60Ni', '61Ni', '62Ni', '63Cu', '64Ni', '65Cu', '66Zn'], index_col=False)
        reread.to_csv(fullname, sep = '\t', header=True, index=False)
        print('Calculate d65/63Cu with SSB Method')
        resultname = os.path.join(outfile_results, 'Cu_delta.csv')
        ssb.ssb()
        print('Processing of ' + available_elements[2] + ' Isotope System finished')
        print('File "'+ outname +'" contains the raw isotope values')
        print('File "'+ resultname +'" contains the d65/63Cu values')

例如,这发生在"outlier.py“中。

代码语言:javascript
复制
def outlier_Cu_sample(file, append=True):
data = pd.read_csv(file, sep='\t', names=['Time', '60Ni', '61Ni', '62Ni', '63Cu', '64Ni', '65Cu', '66Zn'], skiprows=6, nrows=80, index_col=False, dtype=float)
    
    cols = list(data.drop(columns='Time').columns)
    datao = pd.DataFrame({'Time':data['Time']})
    datao[cols] = data[cols].where(np.abs(stats.zscore(data[cols])) < 2)
    datao.to_csv(outfile_corrected_raw_data + basename + '.csv', sep='\t', header = True, index_label='Index_name') ## this creates single files in a folder again in the right order because of the filename

#This below here inserts the mean value of all files in a single csv file. If I do it in sequence, this is because of "sorted" in the right order. With multiprocessing it is in the wrong order. #

mean_filtered_transposed = pd.DataFrame(data=np.mean(datao)).T
mean_filtered_transposed['Time'] = pd.to_datetime(mean_filtered_transposed["Time"], unit='s')
clean = mean_filtered_transposed.drop(mean_filtered_transposed.columns[[0]], axis=1) 
clean.insert(0, 'Inputfile', file)

if append:
    clean.to_csv(fullname, sep='\t', mode="a", header=False, index_label='Index_name')
else:
    clean.to_csv(fullname, sep='\t', mode="w", header=True, index_label='Index_name')

如何通过多处理将其按正确的顺序处理?我想把"mean_filtered“的步骤放在另一个函数中,这个函数是按顺序运行的。但是如何使用多处理调用主函数呢?在未来,我需要能够选择“元素”。

编辑2:这是/modules中的多处理函数:

代码语言:javascript
复制
import sys
sys.path.append('')
import multiprocessing as mp
import modules.config as conf

infile = (conf.WorkspaceVariableInput)
def Cu(file):
    global infile
    global outfile
    inf = infile
    global outfile_results 
    global outfile_corrected_raw_data
    global outfile_plt
    

    file = str(file)
    if file.endswith('Blk.TXT') or file.endswith('BLK_.TXT'):
        outlier.outlier_Cu_blk(file)
    elif file.endswith('.TXT') and 's' in file:
        outlier.outlier_Cu_std(file)
    else:
        outlier.outlier_Cu_sample(file)
    return f"finished processing {file}"

entries = Path(infile)
if __name__ == '__main__':
    data = sorted(entries.glob("*.TXT"))

    with mp.Pool() as p:
        for res in p.imap_unordered(Cu, data):
            print(res)

它由这个函数从文件夹/调用

代码语言:javascript
复制
import modules.config as conf
import modules.Cu as Cu

def element(elements):
global infile
    global outfile
   # global available_elements
   # global calc
    inf = infile
    global outfile_results #= outfile + '/results'
    global outfile_corrected_raw_data
    global outfile_plt
    global export

    if elements == available_elements[2]:
        print('Processing the ' + available_elements[2] + ' Isotope System')
        time.sleep(1)
        print('Start with outlier correction')
        Cu.Cu()
        print('Outlier correction finished')
        """Insert Header"""
        Cu.Cu()

chooseelement = element(str('Cu'))
EN

回答 1

Stack Overflow用户

发布于 2022-04-01 07:04:38

for循环中删除calculate,但将sorted(entries.glob("*.TXT"))放在列表中。将calculate更改为接受单个参数,即文件名。

然后创建一个multiprocessing.Pool。使用Pool.imap_unordered和列表作为参数运行calculate方法。

使用imap_unordered的主要原因是它返回的结果是它们完成的顺序,而map返回的是它们提交的顺序。因此,imap_unordered的完成速度往往更快。如果保持提交的顺序很重要,请使用map

代码语言:javascript
复制
import multiprocessing as mp


def calculate(file):
        if file.endswith('Blk.TXT') or file.endswith('BLK_.TXT'):
            outlier.outlier_Cu_blk(file)
        elif file.endswith('.TXT') and 's' in file:
            outlier.outlier_Cu_std(file)
        else:
            outlier.outlier_Cu_sample(file)
        return f"finished processing {file}"

if __name__ == '__main__':
    data = sorted(entries.glob("*.TXT"))

    with mp.Pool() as p:
        for res in p.imap_unordered(calculate, data):
            print(res)

编辑:

您应该只在主脚本中使用multiprocessing对象,即__name__ == '__main__'位于True的位置。原因在文档中的multiprocessing编程指南中找到。

在ms-windows和macOS平台上,mulitprocessing为每个multiprocessing.Process或池工作人员生成了一个新的multiprocessing.Process解释器。(这是因为macOS没有在POSIX系统中找到fork系统调用,而且一些macOS框架在fork中并不总是很好地工作。)这个start方法需要,原始代码可以由新启动的Python解释器导入,而不会产生副作用,比如尝试启动一个新进程。如果不进行检查,如果不满足这一要求,将导致一连串的Python进程。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71702910

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档