我有一个gzipped文件(压缩的10 to,未压缩的100 to),其中有一些由分界分隔的报告,我必须对其进行解析。解析和处理数据需要很长时间,因此是CPU限制的问题(而不是IO限制的问题)。因此,我计划使用multiprocessing模块将工作拆分成多个进程。问题是我无法有效地向子进程发送/共享数据。我使用subprocess.Popen在父进程中流式传输未压缩的数据。
process = subprocess.Popen('gunzip --keep --stdout big-file.gz',
shell=True,
stdout=subprocess.PIPE)我正在考虑使用Lock()来读取/解析child-process-1中的一个报告,然后释放锁,然后切换到child-process-2来读取/解析下一个报告,然后切换回子进程-1来读取/解析下一个报告)。当我将process.stdout作为args与子进程共享时,我得到了一个酸洗错误。
我曾尝试创建multiprocessing.Queue()和multiprocessing.Pipe()来向子进程发送数据,但这太慢了(事实上,它比在单线程中串行完成要慢得多)。
任何关于有效地将数据发送到子进程的想法/示例都将有所帮助。
发布于 2020-08-21 02:05:33
你能试一下简单的东西吗?让每个工作进程运行自己的gunzip实例,根本不需要进程间通信。Worker 1可以处理第一个报告,并跳过第二个报告。worker 2的情况正好相反。每个worker跳过所有其他报告。然后是对N工作者的一个明显的概括。
或者不..。
我认为你需要更具体地说明你尝试了什么,也许还需要提供更多关于你的问题的信息(比如:有多少条记录?它们有多大?)。
下面是一个程序("genints.py"),它打印一组随机整数,每行一个,通过"xxxxx\n“分隔行将它们分成几组:
from random import randrange, seed
seed(42)
for i in range(1000):
for j in range(randrange(1, 1000)):
print(randrange(100))
print("xxxxx")因为它强制种子,所以每次都会生成相同的东西。现在有一个程序,通过我首先想到的最明显的方式,并行和串行地处理这些组。crunch()在一个组中整型数的数量是平方的,所以它非常受CPU的限制。一次运行的输出,使用(如图所示)并行部分的3个工作进程:
parallel result: 10,901,000,334 0:00:35.559782
serial result: 10,901,000,334 0:01:38.719993因此,并行运行花费了大约三分之一的时间。这与您的问题有哪些相关的不同?当然,完整运行"genints.py“产生的输出少于200万字节,所以这是一个主要的差异-但从这里不可能猜测这是否是相关的差异。Perahps,例如,您的问题只是非常轻微的CPU限制?从这里的输出可以明显看出,在这个程序中,将stdout块传递给工作进程的开销几乎是微不足道的。
简而言之,你可能需要给人们一个完整的程序--就像我刚才为你做的那样--他们可以运行一个完整的程序来重现你的问题。
import multiprocessing as mp
NWORKERS = 3
DELIM = "xxxxx\n"
def runjob():
import subprocess
# 'py' is just a shell script on my box that
# invokes the desired version of Python -
# which happened to be 3.8.5 for this run.
p = subprocess.Popen("py genints.py",
shell=True,
text=True,
stdout=subprocess.PIPE)
return p.stdout
# Return list of lines up to (but not including) next DELIM,
# or EOF. If the file is already exhausted, return None.
def getrecord(f):
result = []
foundone = False
for line in f:
foundone = True
if line == DELIM:
break
result.append(line)
return result if foundone else None
def crunch(rec):
total = 0
for a in rec:
for b in rec:
total += abs(int(a) - int(b))
return total
if __name__ == "__main__":
import datetime
now = datetime.datetime.now
s = now()
total = 0
f = runjob()
with mp.Pool(NWORKERS) as pool:
for i in pool.imap_unordered(crunch,
iter((lambda: getrecord(f)), None)):
total += i
f.close()
print(f"parallel result: {total:,}", now() - s)
s = now()
# try the same thing serially
total = 0
f = runjob()
while True:
rec = getrecord(f)
if rec is None:
break
total += crunch(rec)
f.close()
print(f"serial result: {total:,}", now() - s)https://stackoverflow.com/questions/63510779
复制相似问题