首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多处理时间随堆芯数的增加呈线性增长。

多处理时间随堆芯数的增加呈线性增长。
EN

Stack Overflow用户
提问于 2017-04-10 23:23:47
回答 3查看 261关注 0票数 0

我有一个arcpy进程,它需要在一堆层上进行联合,运行一些计算,并编写一个HTML。考虑到我需要生成的报告数量(~2,100),我需要这个过程尽可能快(我的目标是每个报告2秒)。我已经尝试了很多方法来实现这一点,包括多处理,当我遇到一个问题时,即运行多进程部件基本上需要花费相同的时间,无论我使用多少核。

例如,对于同样数量的报告:

  • 两个核心每轮花费30秒(所以40个报告需要40/2 *30秒)
  • 4个核用了~60秒(40/4 * 60)
  • 10个核花费了~160秒(40/10 * 160)

诸若此类。它的计算总时间是一样的,因为一次翻过两倍的时间需要两倍的时间。

是否意味着我的问题是I/O绑定,而不是CPU绑定?(如果是的话--我该怎么办?)我原以为是后者,因为我的时间上最大的瓶颈是联合(它占了处理时间的50% )。工会在ArcGIS中通常是昂贵的,所以我认为分解它并同时运行2-10会快2-10倍。或者,我可能不正确地实现多进程?

代码语言:javascript
复制
## Worker function just included to give some context

def worker(sub_code):
    layer = 'in_memory/lyr_{}'.format(sub_code)
    arcpy.Select_analysis(subbasinFC, layer, where_clause="SUB_CD = '{}'".format(sub_code))
    arcpy.env.extent = layer
    union_name = 'in_memory/union_' + sub_code

    arcpy.Union_analysis([fields],
                     union_name,
                     "NO_FID", "1 FEET")
    #.......Some calculations using cursors

    # Templating using Jinjah
    context = {}
    context['DATE'] = now.strftime("%B %d, %Y")
    context['SUB_CD'] = sub_code
    context['SUB_ACRES'] = sum([r[0] for r in arcpy.da.SearchCursor(union, ["ACRES"], where_clause="SUB_CD = '{}'".format(sub_code))])
    # Etc

    # Then write the report out using custom function
    write_html('template.html', 'output_folder', context)


if __name__ == '__main__':
    subList = sorted({r[0] for r in arcpy.da.SearchCursor(subbasinFC, ["SUB_CD"])})
    NUM_CORES = 7
    chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)]
    for chunk in chunk_list:
        jobs = []
        for subbasin in chunk:
            p = multiprocessing.Process(target=worker, args=(subbasin,))
            jobs.append(p)
            p.start()

        for process in jobs:
            process.join()
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-04-11 00:21:44

这里没有什么可做的,我也没有使用ArcGIS的经验。所以我只需要注意两件更高层次的事情。首先,“通常”处理此问题的方法是将NUM_CORES = 7下面的所有代码替换为:

代码语言:javascript
复制
pool = multiprocessing.Pool(NUM_CORES)
pool.map(worker, subList)
pool.close()
pool.join()

map()负责尽可能地保持所有工作进程的忙碌。如前所述,您启动了7个进程,然后等待它们全部完成。在最慢的过程之前完成的所有进程都消失了,它们的核心处于闲置状态,等待下一个外部循环迭代。Pool使这7个进程在作业期间保持活动状态,并在完成最后一段工作后立即为每个进程提供一个新的工作。

第二,这个部分以一个逻辑错误结束:

代码语言:javascript
复制
chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)]

你想要的是NUM_CORES而不是NUM_CORES-1。就像-现在,第一次在你周围提取

代码语言:javascript
复制
subList[0:7]

然后

代码语言:javascript
复制
subList[6:13]

然后

代码语言:javascript
复制
subList[12:19]

诸若此类。subList[6]subList[12] (等)分别被提取两次。子列表重叠。

票数 3
EN

Stack Overflow用户

发布于 2017-04-11 00:37:45

你没有充分地向我们展示你在做什么。例如,您的env.workspace是什么?subbasinFC的价值是什么?似乎您在每个进程开始时都在进行分析,以便将数据过滤到layer中。但是,subbasinFC是来自磁盘还是来自内存?如果是来自磁盘,我建议您在任何进程尝试筛选之前将所有内容读入内存。如果你有足够的记忆来支持它的话,这应该会加快你的速度。否则,是的,你的I/O绑定在输入数据上。

请原谅我的arcpy无意义,但是为什么要在context['SUB_ACRES']之和中插入where子句?一开始你不是已经在sub_code上过滤了吗?(我们不知道工会是什么,所以也许你是在与未经过滤的东西联合.)

票数 1
EN

Stack Overflow用户

发布于 2017-04-10 23:49:21

我不确定您是否正确地使用Process池来跟踪您的作业。这是:

代码语言:javascript
复制
for subbasin in chunk:
    p = multiprocessing.Process(target=worker, args=(subbasin,))
    jobs.append(p)
    p.start()

    for process in jobs:
        process.join()

相反,应:

代码语言:javascript
复制
for subbasin in chunk:
    p = multiprocessing.Process(target=worker, args=(subbasin,))
    p.start()
    p.join()

你反对使用多处理库的规范有什么特别的原因吗?在旋转另一个进程之前,不要等到线程结束,这将创建一整组进程,而父进程没有处理这些进程。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43334385

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档