首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >通过在python中使用并行处理来加速内存使用率较高的进程中的例程

通过在python中使用并行处理来加速内存使用率较高的进程中的例程
EN

Stack Overflow用户
提问于 2018-09-04 18:05:55
回答 1查看 74关注 0票数 0

我有一个程序,它使用了大量的内存(150 to的向量,用nmslib索引),并且我在并行执行代码时遇到了麻烦。我的机器有40个内核,我尝试将其并行化的尝试到目前为止还没有成功。程序员首先加载向量,然后根据向量准备一些数据(这部分很好,性能也很好,因为大部分工作都是由nmslib完成的,它本身就是多线程的)。现在,当我对nmslib加载到RAM中的数据进行后处理时,问题开始出现了。我正在迭代一个包含500个条目的列表,每个条目代表一个文件的数据。我用来处理这些数据并尝试并行执行的代码如下:

代码语言:javascript
复制
def tib_result_turn_to_file(data):
fileindex = data[0]
main_bucket = data[1]
result_string = ""
print("Now processing: " + fileindex[0])
print(abs(fileindex[1]-fileindex[2]))
#print(len(results))
c = fileindex[1]
c1 = 0
while c < fileindex[2]:
    if main_bucket == "tengyur1":
        tibwords = tibwords_tengyur1
    if main_bucket == "tengyur2":
        tibwords = tibwords_tengyur2
    if main_bucket == "kangyur":
        tibwords = tibwords_kangyur
    result_string += "\n" + main_bucket + "#" + fileindex[0] + " " + str(c)
    for result in data[2][c1]:
        bucket = result[2]
        if bucket == "tengyur1":
            tibwords = tibwords_tengyur1
        if bucket == "tengyur2":
            tibwords = tibwords_tengyur2
        if bucket == "kangyur":
            tibwords = tibwords_kangyur
        bucket = result[2]
        result_position = result[1]
        result_score = result[0]
        # we don't want results that score too low or that are identical with the source:
        if result_score < 0.03 and (result_position < c- 20 or result_position > c + 20):
                result_string += "\t"  + bucket + "#" + tibwords[result_position][0] + "#" + str(result_position)
    c += 1
    c1 += 1
with open("/mnt/output_parallel/" + fileindex[0][:-4] + "_parallel_index.org", "w") as text_file:
    text_file.write(result_string)

以tibword开头的列表是每个5000万个条目的巨大列表。它们是在育儿例程中定义的,并且不会在此例程中更改,因此我假设它们不会被复制。现在,输入到这个例程中的每一批数据本身都不小,如果我腌制它,我平均可能会得到500mb。由于此例程的唯一目的是通过在其执行结束时写入一个文件来产生副作用,而且由于它不会修改任何可能与其他线程共享的数据,因此我认为将其并行化应该是非常简单的。然而,到目前为止,没有任何东西起作用。我尝试过的:

代码语言:javascript
复制
Parallel(n_jobs=40,backend="threading")(delayed(tib_result_turn_to_file)(i,bucket) for i in files)

这似乎创建了很多线程,但它们似乎不会做太多事情。我猜是GIL妨碍了我们,最多只能使用一个内核。

代码语言:javascript
复制
Parallel(n_jobs=40)(delayed(tib_result_turn_to_file)(i,bucket) for i in files)

这将会中断,因为它会抱怨内存使用情况。如果我添加了选项require='sharedmem‘,它将会运行,但随后它会像前一次尝试一样慢。第三种解决方案:

代码语言:javascript
复制
pool = multiprocessing.Pool(processes=4)
pool.map(tib_result_turn_to_file,files,bucket)
pool.close()

将失败并显示OOM。不过,我不明白为什么。在例程中访问的数据都是只读的,即使我将例程简化为:

代码语言:javascript
复制
def tib_result_turn_to_file(data):
    print("Hello world")

池将失败,并显示OOM。这是因为我加载了程序前一部分中的大量索引(此时这些索引仍在内存中,但不再使用)?如果这应该是原因,有没有可能绕过这个问题?我的方法到底对不对?我想知道我是否应该将这个程序一分为二,首先使用nmslib进行向量运算,然后在第二步进行后处理,但在我看来,这会增加很多不必要的复杂性。

EN

回答 1

Stack Overflow用户

发布于 2018-09-05 13:22:32

实际上,解决方案是我必须删除所有以前创建的nmslib索引,然后才能并行化例程的进一步执行,即使这些变量没有在新进程中使用。似乎python不能不将所有内容都复制到新进程中。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52163808

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档