我是Python3中多处理模块的新用户。我有两个fastq文件(正和反向),我想要处理前/反向对读。对于这一点,从前读开始,我得到了相应的反向,并应用了一个函数,其中包含多个参数。到目前为止,我已经在一个线程上依次完成了,这对于巨大的文件来说是相当长的。现在,我希望通过并行化函数应用程序来提高速度,所以我创建转发文件的块,并使用多处理将函数应用于每个块。以下是代码:
def chunk_itr(iterator, chunk_size):
"""
Function to split fastq file into smallest files for faster processing
From biopython solutions
"""
entry = True
while entry:
chunk = []
while len(chunk) < chunk_size:
try:
entry = next(iterator)
except StopIteration:
entry = None
if entry is None:
break
chunk.append(entry)
if chunk:
yield chunk
def chunk_fastq(f_fastq, chunkSize, path2out):
rec_itr = SeqIO.parse(open(f_fastq), "fastq")
os.mkdir(os.path.join(path2out, "chunk_files"))
dir_out = os.path.join(path2out, "chunk_files")
base = os.path.basename(f_fastq)
fname = os.path.splitext(base)[0]
for i, chunk in enumerate(chunk_itr(rec_itr, chunkSize)):
out_chunk_name = os.path.join(dir_out, "{0}_chunk{1}.fastq".format(fname, i))
with open(out_chunk_name, "w") as handle:
SeqIO.write(chunk, handle, "fastq")
def testmulti(fwd_chunk, rev_idx):
fwd_idx = SeqIO.index(fwd_chunk, "fastq")
for i in fwd_idx:
print(i, rev_idx[i])
pathfwd = "path/to/forward_file"
f_rev = "path/to/rev_fastq"
def main():
rev_idx = SeqIO.index(f_rev, "fastq")
chunk_fastq(pathfwd, 1000, path2chunk)
files = [f for f in os.listdir(path2chunk)]
# sequential
for i in files:
testmulti(i, rev_idx)
# parallel process
processes = []
for i in files:
proc = mp.Process(target=testmulti, args=(i, rev_idx,))
processes.append(proc)
proc.start()
for p in processes:
p.join()顺序方法工作得很好,但是并行的一次崩溃有以下错误:
Process Process-2:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 28, in testmulti
print(i, rev_idx[i])
File "test.py", line 28, in testmulti
print(i, rev_idx[i])
File "/home/user/.local/lib/python3.6/site-packages/Bio/File.py", line 417, in __getitem__
record = self._proxy.get(self._offsets[key])
File "/home/user/.local/lib/python3.6/site-packages/Bio/File.py", line 417, in __getitem__
record = self._proxy.get(self._offsets[key])
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 69, in get
return self._parse(StringIO(_bytes_to_string(self.get_raw(offset))))
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 69, in get
return self._parse(StringIO(_bytes_to_string(self.get_raw(offset))))
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 664, in get_raw
raise ValueError("Problem with quality section")
File "/home/user/.local/lib/python3.6/site-packages/Bio/SeqIO/_index.py", line 642, in get_raw
raise ValueError("Premature end of file in seq section")
ValueError: Problem with quality section
ValueError: Premature end of file in seq section从biopython中的Index类描述来看,文件格式/结构出现了问题,我对输入文件进行了双重检查,没有错误(而且它适用于顺序方法)。我猜到目前为止:
使用像这样的进程的
。
如能提供任何帮助,将不胜感激。
谢谢!
发布于 2019-12-31 10:39:17
好的,所以我仍然不能100%确定错误的原因,但是在增加我的fastq文件的大小之后,我能够复制它。
这肯定与用SeqIO.index创建的反向索引对象有关,但是,由于存在大量继承,我很难完全理解源代码中的情况。我怀疑这与将打开的文件句柄对象传递给子进程有关,但我在这方面还不够精通,无法保证它。
但是,我可以成功地防止错误。解决方案包括将反向索引的创建移动到子进程。我看不出有什么好的理由不这样做,SeqIO.Index方法的全部目的是创建一个低内存索引,而不是将整个文件读入内存中,因此创建每个子进程的开销不应过高。
def testmulti(fwd_chunk, rev):
rev_idx = SeqIO.index(rev, "fastq")
fwd_idx = SeqIO.index(fwd_chunk, "fastq")
for i in fwd_idx:
print(i, rev_idx[i])
pathfwd = "path/to/forward_file"
f_rev = "path/to/rev_fastq"
def main():
chunk_fastq(pathfwd, 1000, path2chunk)
files = [f for f in os.listdir(path2chunk)]
# sequential
for i in files:
testmulti(i, f_rev)
# parallel process
processes = []
for i in files:
proc = mp.Process(target=testmulti, args=(i, f_rev,))
processes.append(proc)
proc.start()
for p in processes:
p.join()https://stackoverflow.com/questions/59533368
复制相似问题