首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用python包装器并行化python脚本

用python包装器并行化python脚本
EN

Stack Overflow用户
提问于 2021-01-03 23:03:46
回答 3查看 230关注 0票数 0

我有一个python脚本heavy_lifting.py,它使用wrapper.sh包装器脚本wrapper.sh调用的parallelized并行化。我使用它来处理fastq格式的文件,请参阅下面的example.fastq。虽然这是可行的,但要求使用两个解释器和一组依赖项是不优雅的。我想使用python重写bash包装器脚本,同时实现同样的并行化。

example.fastq --这是一个需要处理的输入文件的示例。这个输入文件通常很长(大约500,000,000行)。

代码语言:javascript
复制
@SRR6750041.1 1/1
CTGGANAAGTGAAATAATATAAATTTTTCCACTATTGAATAAAAGCAACTTAAATTTTCTAAGTCG
+
AAAAA#EEEEEEEEEEEEEEEEEEEEEEEAEEEEEEEEEEEEEEEEEEEEEEEEEA<AAEEEEE<6
@SRR6750041.2 2/1
CTATANTATTCTATATTTATTCTAGATAAAAGCATTCTATATTTAGCATATGTCTAGCAAAAAAAA
+
AAAAA#EE6EEEEEEEEEEEEAAEEAEEEEEEEEEEEE/EAE/EAE/EA/EAEAAAE//EEAEAA6
@SRR6750041.3 3/1
ATCCANAATGATGTGTTGCTCTGGAGGTACAGAGATAACGTCAGCTGGAATAGTTTCCCCTCACAG
+
AAAAA#EE6E6EEEEEE6EEEEAEEEEEEEEEEE//EAEEEEEAAEAEEEAE/EAEEA6/EEA<E/
@SRR6750041.4 4/1
ACACCNAATGCTCTGGCCTCTCAAGCACGTGGATTATGCCAGAGAGGCCAGAGCATTCTTCGTACA
+
/AAAA#EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEAE/E/<//AEA/EA//E//

下面的是我从.开始使用的脚本的最小可复制示例。

heavy_lifting.py

代码语言:javascript
复制
#!/usr/bin/env python
import argparse

# Read in arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
args = parser.parse_args()

# Iterate through input file and append to output file
with open(args.inputFastq, "r") as infile:
    with open(args.outputFastq, "a") as outfile:
    for line in infile:
        outfile.write("modified" + line)

wrapper.sh

代码语言:javascript
复制
#!/bin/bash

NUMCORES="4"
FASTQ_F="./fastq_F.fastq"

# split the input fastq for parallel processing. One split fastq file will be created for     each core available.
split --number="l/$NUMCORES" $FASTQ_F split_fastq_F_

# Feed split fastq files to GNU Parallel to invoke parallel executions of `heavy_lifting.py`
ls split_fastq_F* | awk -F "split_fastq_F" '{print $2}' | parallel "python  heavy_lifting.py -i split_fastq_F{} -o output.fastq"

#remove intermediate split fastq files
rm split_fastq_*

要执行这些脚本,我使用命令bash wrapper.sh**. 您可以看到,创建了一个结果文件** output.fastq ,并包含一个修改后的fastq文件.

下面是我使用python包装器wrapper.py**.**调用并行处理的尝试。

wrapper.py

代码语言:javascript
复制
#!/usr/bin/env python

import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing

numcores = 4
fastq_F = "fastq_F.fastq"

#Create some logic to split the input fastq file into chunks for parallel processing.  

# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
    length_fastq = len(infile.readlines())
    print(length_fastq)
    lines = infile.readlines()
    split_size = length_fastq / numcores
    print(split_size)

# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
    for line in infile:
        if counter == 0:
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            counter += 1
        elif counter <= split_size:
            outfile.write(line.strip())
            counter += 1
        else:
            counter = 0
            split_counter += 1
            outfile.close()


Parallel(n_jobs=numcores)(delayed(heavy_lifting)(i, "output.fastq") for i in split_fastq_list)

编辑以提高wrapper.py的重现性

--我似乎对如何将输入参数正确地输入到pythonwrapper.py脚本中的“并行”调用中感到非常困惑。任何帮助都非常感谢!

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2021-01-03 23:14:04

Parallel需要函数的名称,而不是文件/模块的名称

因此,在heavy_lifting中,您必须将代码放入函数(使用参数而不是args)

代码语言:javascript
复制
def my_function(inputFastq, outputFastq):

    with open(inputFastq, "r") as infile:
        with open(outputFastq, "a") as outfile:
            for line in infile:
                outfile.write("modified" + line)

然后你可以用

代码语言:javascript
复制
Parallel(n_jobs=numcores)(delayed(heavy_lifting.my_function)(i, "output.fastq") for i in split_fastq_list)
票数 1
EN

Stack Overflow用户

发布于 2021-01-05 14:22:36

这应该是一个评论,因为它没有回答这个问题,但它太大了。

所有的wrapper.sh都可以写成:

代码语言:javascript
复制
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --cat "python  heavy_lifting.py -i {} -o output.fastq"

如果heavy_lifting.py只读取文件而不查找,这也应该有效,并且需要更少的磁盘I/O (临时文件被fifo替换):

代码语言:javascript
复制
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --fifo "python  heavy_lifting.py -i {} -o output.fastq"

它将自动检测CPU线程的数量,在以@SRR开头的行中拆分fastq文件,将其分割为每个CPU线程的一个块,并将其分给python。

如果heavy_lifting.py在没有给出-i的情况下从stdin读取,那么这也应该有效:

代码语言:javascript
复制
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output.fastq"

如果heavy_lifting.py不向output.fastq追加唯一字符串,它将被覆盖。因此,最好让GNU并行给它一个惟一的名称,比如output2.fastq

代码语言:javascript
复制
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output{#}.fastq"

有关更通用的FASTQ并行包装器,请参见:https://stackoverflow.com/a/41707920/363028

票数 1
EN

Stack Overflow用户

发布于 2021-01-04 17:09:19

为了重现性,我将furas提供的答案实现到heavy_lifting.pywrapper.py脚本中。需要更多的编辑才能使代码运行,这就是我提供以下内容的原因。

heavy_lifting.py

代码语言:javascript
复制
#!/usr/bin/env python
import argparse

# Read in arguments
#parser = argparse.ArgumentParser()
#parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
#parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
#args = parser.parse_args()

def heavy_lifting_fun(inputFastq, outputFastq):
    # Iterate through input file and append to output file
    outfile = open(outputFastq, "a")
    with open(inputFastq, "r") as infile:
        for line in infile:
            outfile.write("modified" + line.strip() + "\n")
    outfile.close()

if __name__ == '__main__':
heavy_lifting_fun()

wrapper.py

代码语言:javascript
复制
#!/usr/bin/env python

import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing

numcores = 4
fastq_F = "fastq_F.fastq"

#Create some logic to split the input fastq file into chunks for parallel processing.  

# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
    length_fastq = len(infile.readlines())
    print(length_fastq)
    lines = infile.readlines()
    split_size = length_fastq / numcores
    while (split_size  % 4 != 0):
        split_size += 1
    print(split_size)

# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
    for line in infile:
        print(counter)
        #if counter == 0 and line[0] != "@":
        #    continue
        if counter == 0:
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            outfile.write(str(line.strip() + "\n"))
            counter += 1
        elif counter < split_size:
            outfile.write(str(line.strip() + "\n"))
            counter += 1
        else:
            counter = 0
            split_counter += 1
            outfile.close()
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            outfile.write(str(line.strip() + "\n"))
            counter += 1
    outfile.close()

Parallel(n_jobs=numcores)(delayed(heavy_lifting.heavy_lifting_fun)(i, "output.fastq") for i in split_fastq_list)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65555823

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档