首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >python多处理fastq函数

python多处理fastq函数
EN

Stack Overflow用户
提问于 2019-12-30 15:45:23
回答 1查看 399关注 0票数 0

我是Python3中多处理模块的新用户。我有两个fastq文件(正和反向),我想要处理前/反向对读。对于这一点,从前读开始,我得到了相应的反向,并应用了一个函数,其中包含多个参数。到目前为止,我已经在一个线程上依次完成了,这对于巨大的文件来说是相当长的。现在,我希望通过并行化函数应用程序来提高速度,所以我创建转发文件的块,并使用多处理将函数应用于每个块。以下是代码:

代码语言:javascript
复制
def chunk_itr(iterator, chunk_size):
    """
    Function to split fastq file into smallest files for faster processing
    From biopython solutions
    """
    entry = True
    while entry:
        chunk = []
        while len(chunk) < chunk_size:
            try:
                entry = next(iterator)
            except StopIteration:
                entry = None
            if entry is None:
                break
            chunk.append(entry)
        if chunk:
            yield chunk


def chunk_fastq(f_fastq, chunkSize, path2out):
    rec_itr = SeqIO.parse(open(f_fastq), "fastq")
    os.mkdir(os.path.join(path2out, "chunk_files"))
    dir_out = os.path.join(path2out, "chunk_files")
    base = os.path.basename(f_fastq)
    fname = os.path.splitext(base)[0]
    for i, chunk in enumerate(chunk_itr(rec_itr, chunkSize)):
        out_chunk_name = os.path.join(dir_out, "{0}_chunk{1}.fastq".format(fname, i))
        with open(out_chunk_name, "w") as handle:
            SeqIO.write(chunk, handle, "fastq")

def testmulti(fwd_chunk, rev_idx):
    fwd_idx = SeqIO.index(fwd_chunk, "fastq")
    for i in fwd_idx:
        print(i, rev_idx[i])

pathfwd = "path/to/forward_file"
f_rev = "path/to/rev_fastq"

def main():
    rev_idx = SeqIO.index(f_rev, "fastq")
    chunk_fastq(pathfwd, 1000, path2chunk)
    files = [f for f in os.listdir(path2chunk)]

# sequential
    for i in files:
        testmulti(i, rev_idx)

# parallel process
    processes = []
    for i in files:
        proc = mp.Process(target=testmulti, args=(i, rev_idx,))
        processes.append(proc)
        proc.start()
    for p in processes:
        p.join()

顺序方法工作得很好,但是并行的一次崩溃有以下错误:

代码语言:javascript
复制
Process Process-2:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "test.py", line 28, in testmulti
    print(i, rev_idx[i])
  File "test.py", line 28, in testmulti
    print(i, rev_idx[i])
  File "/home/user/.local/lib/python3.6/site-packages/Bio/File.py", line 417, in __getitem__
    record = self._proxy.get(self._offsets[key])
  File "/home/user/.local/lib/python3.6/site-packages/Bio/File.py", line 417, in __getitem__
    record = self._proxy.get(self._offsets[key])
  File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 69, in get
    return self._parse(StringIO(_bytes_to_string(self.get_raw(offset))))
  File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 69, in get
    return self._parse(StringIO(_bytes_to_string(self.get_raw(offset))))
  File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 664, in get_raw
    raise ValueError("Problem with quality section")
  File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 642, in get_raw
    raise ValueError("Premature end of file in seq section")
ValueError: Problem with quality section
ValueError: Premature end of file in seq section

从biopython中的Index类描述来看,文件格式/结构出现了问题,我对输入文件进行了双重检查,没有错误(而且它适用于顺序方法)。我猜到目前为止:

使用像这样的进程的

  1. 不是一个很好的选择(我也尝试了pool.starmap,但没有成功),因为f_rev被索引了一次,然后每个进程尝试并行使用它,就会出现冲突

如能提供任何帮助,将不胜感激。

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-12-31 10:39:17

好的,所以我仍然不能100%确定错误的原因,但是在增加我的fastq文件的大小之后,我能够复制它。

这肯定与用SeqIO.index创建的反向索引对象有关,但是,由于存在大量继承,我很难完全理解源代码中的情况。我怀疑这与将打开的文件句柄对象传递给子进程有关,但我在这方面还不够精通,无法保证它。

但是,我可以成功地防止错误。解决方案包括将反向索引的创建移动到子进程。我看不出有什么好的理由不这样做,SeqIO.Index方法的全部目的是创建一个低内存索引,而不是将整个文件读入内存中,因此创建每个子进程的开销不应过高。

代码语言:javascript
复制
def testmulti(fwd_chunk, rev):
    rev_idx = SeqIO.index(rev, "fastq")
    fwd_idx = SeqIO.index(fwd_chunk, "fastq")
    for i in fwd_idx:
        print(i, rev_idx[i])

pathfwd = "path/to/forward_file"
f_rev = "path/to/rev_fastq"

def main():
    chunk_fastq(pathfwd, 1000, path2chunk)
    files = [f for f in os.listdir(path2chunk)]

# sequential
    for i in files:
        testmulti(i, f_rev)

# parallel process
    processes = []
    for i in files:
        proc = mp.Process(target=testmulti, args=(i, f_rev,))
        processes.append(proc)
        proc.start()
    for p in processes:
        p.join()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59533368

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档