首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我需要在Python中通过读取gzipped文件来使用2+进程进行CPU受限的处理

我需要在Python中通过读取gzipped文件来使用2+进程进行CPU受限的处理
EN

Stack Overflow用户
提问于 2020-08-21 01:58:15
回答 1查看 51关注 0票数 1

我有一个gzipped文件(压缩的10 to,未压缩的100 to),其中有一些由分界分隔的报告,我必须对其进行解析。解析和处理数据需要很长时间,因此是CPU限制的问题(而不是IO限制的问题)。因此,我计划使用multiprocessing模块将工作拆分成多个进程。问题是我无法有效地向子进程发送/共享数据。我使用subprocess.Popen在父进程中流式传输未压缩的数据。

代码语言:javascript
复制
process = subprocess.Popen('gunzip --keep --stdout big-file.gz',
                           shell=True, 
                           stdout=subprocess.PIPE)

我正在考虑使用Lock()来读取/解析child-process-1中的一个报告,然后释放锁,然后切换到child-process-2来读取/解析下一个报告,然后切换回子进程-1来读取/解析下一个报告)。当我将process.stdout作为args与子进程共享时,我得到了一个酸洗错误。

我曾尝试创建multiprocessing.Queue()multiprocessing.Pipe()来向子进程发送数据,但这太慢了(事实上,它比在单线程中串行完成要慢得多)。

任何关于有效地将数据发送到子进程的想法/示例都将有所帮助。

EN

回答 1

Stack Overflow用户

发布于 2020-08-21 02:05:33

你能试一下简单的东西吗?让每个工作进程运行自己的gunzip实例,根本不需要进程间通信。Worker 1可以处理第一个报告,并跳过第二个报告。worker 2的情况正好相反。每个worker跳过所有其他报告。然后是对N工作者的一个明显的概括。

或者不..。

我认为你需要更具体地说明你尝试了什么,也许还需要提供更多关于你的问题的信息(比如:有多少条记录?它们有多大?)。

下面是一个程序("genints.py"),它打印一组随机整数,每行一个,通过"xxxxx\n“分隔行将它们分成几组:

代码语言:javascript
复制
from random import randrange, seed

seed(42)
for i in range(1000):
    for j in range(randrange(1, 1000)):
        print(randrange(100))
    print("xxxxx")

因为它强制种子,所以每次都会生成相同的东西。现在有一个程序,通过我首先想到的最明显的方式,并行和串行地处理这些组。crunch()在一个组中整型数的数量是平方的,所以它非常受CPU的限制。一次运行的输出,使用(如图所示)并行部分的3个工作进程:

代码语言:javascript
复制
parallel result: 10,901,000,334 0:00:35.559782
serial   result: 10,901,000,334 0:01:38.719993

因此,并行运行花费了大约三分之一的时间。这与您的问题有哪些相关的不同?当然,完整运行"genints.py“产生的输出少于200万字节,所以这是一个主要的差异-但从这里不可能猜测这是否是相关的差异。Perahps,例如,您的问题只是非常轻微的CPU限制?从这里的输出可以明显看出,在这个程序中,将stdout块传递给工作进程的开销几乎是微不足道的。

简而言之,你可能需要给人们一个完整的程序--就像我刚才为你做的那样--他们可以运行一个完整的程序来重现你的问题。

代码语言:javascript
复制
import multiprocessing as mp

NWORKERS = 3
DELIM = "xxxxx\n"

def runjob():
    import subprocess
    # 'py' is just a shell script on my box that
    # invokes the desired version of Python -
    # which happened to be 3.8.5 for this run.
    p = subprocess.Popen("py genints.py",
                         shell=True,
                         text=True,
                         stdout=subprocess.PIPE)
    return p.stdout

# Return list of lines up to (but not including) next DELIM,
# or EOF. If the file is already exhausted, return None.
def getrecord(f):
    result = []
    foundone = False
    for line in f:
        foundone = True
        if line == DELIM:
            break
        result.append(line)
    return result if foundone else None

def crunch(rec):
    total = 0
    for a in rec:
       for b in rec:
          total += abs(int(a) - int(b))
    return total
        
if __name__ == "__main__":
    import datetime
    now = datetime.datetime.now

    s = now()
    total = 0
    f = runjob()
    with mp.Pool(NWORKERS) as pool:
        for i in pool.imap_unordered(crunch,
                                     iter((lambda: getrecord(f)), None)):
            total += i
    f.close()
    print(f"parallel result: {total:,}", now() - s)

    s = now()
    # try the same thing serially
    total = 0
    f = runjob()
    while True:
        rec = getrecord(f)
        if rec is None:
            break
        total += crunch(rec)
    f.close()
    print(f"serial   result: {total:,}", now() - s)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63510779

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档