首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用python的多处理方法在keras中并行化模型预测

使用python的多处理方法在keras中并行化模型预测
EN

Stack Overflow用户
提问于 2020-03-28 19:20:14
回答 1查看 10.2K关注 0票数 3

我试图使用model.predict命令并行地执行模型预测,该命令由python2中的keras提供。对于python2,我使用tensorflow 1.14.0。我有5个模型(.h5)文件,并且希望parallel.This中运行的predict命令在python2.7中运行。我使用多处理池将模型文件名与多个进程的预测函数进行映射,如下所示,

代码语言:javascript
复制
import matplotlib as plt
import numpy as np
import cv2
from multiprocessing import Pool
pool=Pool()
def prediction(model_name):
    global input
    from tensorflow.keras.models import load_model
    model=load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
start_time=time.time()
res=pool.map(prediction,models)
print('Total time taken: {}'.format(time.time() - start_time))
print(res)

输入是从代码的另一部分获取的图像数字数组。但在执行过程中我得到了以下信息,

代码语言:javascript
复制
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
    return recv()
AttributeError: 'module' object has no attribute 'prediction'
AttributeError: 'module' object has no attribute 'prediction'

我无法解释这条错误信息,如何解决这个问题?任何建议都非常感谢!

更新2:感谢所有的指针和完整的示例@sokato。我执行@sokato发布的确切代码,但是我得到了以下错误(我在代码中也做了修改,并得到了如下所示的相同错误),

代码语言:javascript
复制
Traceback (most recent call last):
  File "stackoverflow.py", line 47, in <module>
    with multiprocessing.Pool() as p:
AttributeError: __exit__

UPDATE3:,谢谢你的支持,我认为UPDATE2中的问题是因为使用python2而不是python3。我能够解决UPDATE2 for python2中给出的错误,方法是使用with closing(multiprocessing.Pool()) as p:而不是@sokato的代码中的with multiprocessing.Pool() as p:。导入关闭函数如下所示:from contextlib import closing

使用以下不同方法发布的新问题,

我实际上有多个输入。与每次为每个输入加载模型不同,我希望先加载所有模型,然后将其保存在列表中。我做了如下所示,

代码语言:javascript
复制
import matplotlib as plt
import numpy as np
import cv2
import multiprocessing
import tensorflow as tf
from contextlib import closing
import time

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
loaded_models=[]
for model in models:
    loaded_models.append(tf.keras.models.load_model(model))

def prediction(input_tuple):
    inputs,loaded_models=input_tuple
    predops=[]
    for model in loaded_models:
        predops.append(model.predict(inputs).tolist()[0])
    actops=[]
    for predop in predops:
        actops.append(predop.index(max(predop)))
    max_freqq = max(set(actops), key = actops.count) 
    return max_freqq

#....some pre-processing....#

    '''new_all_t is a list which contains tuples and each tuple has inputs from all_t 
    and the list containing loaded models which will be extracted
 in the prediction function.'''

new_all_t=[]
for elem in all_t:
    new_all_t.append((elem,loaded_models))
start_time=time.time()
with closing(multiprocessing.Pool()) as p:
    predops=p.map(prediction,new_all_t)
print('Total time taken: {}'.format(time.time() - start_time))

new_all_t是一个包含元组的列表,每个元组都有来自all_t的输入,以及包含将在预测function.However中提取的加载模型的列表,我现在得到以下错误,

代码语言:javascript
复制
Traceback (most recent call last):
  File "trial_mult-ips.py", line 240, in <module>
    predops=p.map(prediction,new_all_t)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
NotImplementedError: numpy() is only available when eager execution is enabled.

这究竟说明了什么?我该怎么解决这个问题?

UPDATE4: I在一开始就包含了tf.compat.v1.enable_eager_execution()tf.compat.v1.enable_v2_behavior()这两个行。现在我得到了以下错误,

代码语言:javascript
复制
WARNING:tensorflow:From /home/nick/.local/lib/python2.7/site-packages/tensorflow/python/ops/math_grad.py:1250: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where

Traceback (most recent call last):
  File "the_other_end-mp.py", line 216, in <module>
    predops=p.map(prediction,modelon)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
ValueError: Resource handles are not convertible to numpy.

我无法解释这条错误信息,如何解决这个问题?任何建议都非常感谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-28 23:16:44

所以,我不确定你的一些设计选择,但我给了它最好的尝试与给定的信息。具体来说,我认为在并行函数中,全局变量和import语句可能存在一些问题。

  1. 您应该使用共享变量而不是全局变量来共享进程之间的输入。如果您希望在教程中生成的多处理documentation.
  2. I模型中阅读更多有关共享内存的内容,您可以阅读更多有关共享内存的内容,因为您的模型不包括在内。

  1. ,您不是要加入或关闭池,但是使用下面的代码,我能够成功地使代码并行执行。您可以通过调用pool.close()或使用下面所示的" with“语法来关闭池。注意,with语法不适用于python2.7.

代码语言:javascript
复制
import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf

mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

def createModels(models):
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=mis),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10)
    ])

    model.compile(optimizer='adam',
               loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
               metrics=['accuracy'])

    model.fit(x_train, y_train, epochs=5)

    for mod in models:
        model.save(mod)

def prediction(model_name):

    model=tf.keras.models.load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

if __name__ == "__main__":
    models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
    dir = os.listdir(".")
    if models[0] not in dir:
        createModels(models)
    # Shared array input
    ub = 100
    testShape = x_train[:ub].shape
    input_base = multiprocessing.Array(ctypes.c_double, 
    int(np.prod(testShape)),lock=False)
    input = np.ctypeslib.as_array(input_base)
    input = input.reshape(testShape)
    input[:ub] = x_train[:ub]

    # with multiprocessing.Pool() as p:  #Use me for python 3
    p = multiprocessing.Pool() #Use me for python 2.7
    start_time=time.time()
    res=p.map(prediction,models)
    p.close() #Use me for python 2.7
    print('Total time taken: {}'.format(time.time() - start_time))
    print(res)

我希望这能帮到你。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60905801

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档