在我的机器学习项目中,我将gunicorn与多个工人结合使用。但问题是,当我发送火车请求时,只有获得培训请求的工人在完成培训后才会更新最新的培训模式。这里值得一提的是,为了使推理更快,我编写了程序,在每次训练之后加载模型一次。这就是为什么唯一用于当前培训操作的员工加载最新的模型,而其他员工仍然保留以前加载的模型。现在,模型文件(binary格式)在全局 dictionary变量中每次训练后加载一次,其中key是模型名,值是模型文件。显然,如果我每次从磁盘为每个预测编写该模型,就不会出现这个问题,但我不能这样做,因为它会使预测速度变慢。
我进一步研究了全局变量,进一步的研究表明,在一个多处理环境中,所有的workers (processes)都创建了自己的global变量副本。除了二进制模型文件之外,我还需要在所有进程之间同步一些其他global变量( dictionary类型)。那么,如何处理这种情况呢?
TL;DR:我需要一些方法来帮助我存储变量,这个变量在所有流程(工作人员)中都是常见的。有什么办法吗?与multiprocessing.Manager,dill等?
Update 1:我的项目中有多个机器学习算法,它们有自己的模型文件,它们被加载到dictionary中的内存中,其中key是模型名,value是对应的模型对象。我需要共享它们(换句话说,我需要共享dictionary)。但有些模型是,而不是 pickle,可串行化的类似- FastText。因此,当我尝试使用一个代理变量(在我的例子中是dictionary来保存模型)和multiprocessing.Manager时,当将加载的模型文件分配给这个字典时,我会得到那些non-pickle-serializable对象的错误。比如:can't pickle fasttext_pybind.fasttext objects。有关multiprocessing.Manager的更多信息可以在这里找到:代理对象
以下是我所做的总结:
import multiprocessing
import fasttext
mgr = multiprocessing.Manager()
model_dict = mgr.dict()
model_file = fasttext.load_model("path/to/model/file/which/is/in/.bin/format")
model_dict["fasttext"] = model_file # This line throws this error错误:
can't pickle fasttext_pybind.fasttext objects我打印了我要分配的model_file,它是:
<fasttext.FastText._FastText object at 0x7f86e2b682e8>更新2:根据这答案,我稍微修改了代码:
import fasttext
from multiprocessing.managers import SyncManager
def Manager():
m = SyncManager()
m.start()
return m
# As the model file has a type of "<fasttext.FastText._FastText object at 0x7f86e2b682e8>" so, using "fasttext.FastText._FastText" as the class of it
SyncManager.register("fast", fasttext.FastText._FastText)
# Now this is the Manager as a replacement of the old one.
mgr = Manager()
ft = mgr.fast() # This line gives error.这给了我EOFError。
更新3:我尝试将dill与multiprocessing和multiprocess结合使用。变动摘要如下:
import multiprocessing
import multiprocess
import dill
# Any one of the following two lines
mgr = multiprocessing.Manager() # Or,
mgr = multiprocess.Manager()
model_dict = mgr.dict()
... ... ...
... ... ...
model_file = dill.dumps(model_file) # This line throws the error
model_dict["fasttext"] = model_file
... ... ...
... ... ...
# During loading
model_file = dill.loads(model_dict["fasttext"])但仍然会出现错误:can't pickle fasttext_pybind.fasttext objects。
更新4:这次我使用另一个名为松泡菜的库。看来序列化和反序列化是正确的(因为它在运行时没有报告任何问题)。但令人惊讶的是,在反序列化之后,每当我做预测时,它就会面临segmentation fault。更多的细节和复制它的步骤可以在这里找到:分段故障(堆芯倾弃)
发布于 2022-12-04 03:36:39
为了完整起见,我正在提供对我有用的解决方案。我试图序列化FastText的所有方法都是徒劳的。最后,正如评论中所提到的那样,我设法与使用redis-pubsub的其他工作人员共享了从磁盘加载模型的消息。显然,它实际上并不是从相同的内存空间共享模型,而是只向其他工作人员共享消息,通知他们应该从磁盘加载模型(就像刚刚进行的新培训一样)。以下是一般解决办法:
# redis_pubsub.py
import logging
import os
import fasttext
import socket
import threading
import time
"""The whole purpose of GLOBAL_NAMESPACE is to keep the whole pubsub mechanism separate.
As this might be a case another service also publishing in the same channel.
"""
GLOBAL_NAMESPACE = "SERVICE_0"
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
class RedisPubSub:
def __init__(self):
self.redis_client = get_redis_client() #TODO: A SAMPLE METHOD WHICH CAN RETURN YOUR REDIS CLIENT (you have to implement)
# Unique ID is used, to identify which worker from which server is the publisher. Just to avoid updating
# getting a message which message is indeed sent by itself.
self.unique_id = "IP_" + get_ip() + "__" + str(GLOBAL_NAMESPACE) + "__" + "PID_" + str(os.getpid())
def listen_to_channel_and_update_models(self, channel):
try:
pubsub = self.redis_client.pubsub()
pubsub.subscribe(channel)
except Exception as exception:
logging.error(f"REDIS_ERROR: Model Update Listening: {exception}")
while True:
try:
message = pubsub.get_message()
# Successful operation gives 1 and unsuccessful gives 0
# ..we are not interested to receive these flags
if message and message["data"] != 1 and message["data"] != 0:
message = message["data"].decode("utf-8")
message = str(message)
splitted_msg = message.split("__SEPERATOR__")
# Not only making sure the message is coming from another worker
# but also we have to make sure the message sender and receiver (i.e, both of the workers) are under the same namespace
if (splitted_msg[0] != self.unique_id) and (splitted_msg[0].split('__')[1] == GLOBAL_NAMESPACE):
algo_name = splitted_msg[1]
model_path = splitted_msg[2]
# Fasttext
if "fasttext" in algo_name:
try:
#TODO: YOU WILL GET THE LOADED NEW FILE IN model_file. USE IT TO UPDATE THE OLD ONE.
model_file = fasttext.load_model(model_path + '.bin')
except Exception as exception:
logging.error(exception)
else:
logging.info(f"{algo_name} model is updated for process with unique_id: {self.unique_id} by process with unique_id: {splitted_msg[0]}")
time.sleep(1) # sleeping for 1 second to avoid hammering the CPU too much
except Exception as exception:
time.sleep(1)
logging.error(f"PUBSUB_ERROR: Model or component update: {exception}")
def publish_to_channel(self, channel, algo_name, model_path):
def _publish_to_channel():
try:
message = self.unique_id + '__SEPERATOR__' + str(algo_name) + '__SEPERATOR__' + str(model_path)
time.sleep(3)
self.redis_client.publish(channel, message)
except Exception as exception:
logging.error(f"PUBSUB_ERROR: Model or component publishing: {exception}")
# As the delay before pubsub can pause the next activities which are independent, hence, doing this publishing in another thread.
thread = threading.Thread(target = _publish_to_channel)
thread.start()此外,您还必须启动侦听器:
from redis_pubsub import RedisPubSub
pubsub = RedisPubSub()
# start the listener:
thread = threading.Thread(target = pubsub.listen_to_channel_and_update_models, args = ("sync-ml-models", ))
thread.start()在fasttext培训模块中,完成培训后,将此消息发布给其他员工,以便其他员工有机会从磁盘重新加载模型:
# fasttext_api.py
from redis_pubsub import RedisPubSub
pubsub = RedisPubSub()
pubsub.publish_to_channel(channel = "sync-ml-models", # a sample name for the channel
algo_name = f"fasttext",
model_path = "path/to/fasttext/model")https://stackoverflow.com/questions/69430747
复制相似问题