首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多个小ML模型的分布式训练

多个小ML模型的分布式训练
EN

Software Engineering用户
提问于 2018-09-17 18:22:13
回答 2查看 387关注 0票数 2

我有一个数据科学应用程序,涉及到训练数以万计的小个体高斯模型。所谓“小”,我的意思是,任何单独的模型都可以很容易地在我们的一个工人服务器上训练。事实上,我们应该能够在每一个节点上同时训练几个。

我一直在探索与Yarn一起使用Spark,但是Spark似乎真的是为了训练多台机器上的大型模型,而不是包含在单个机器上的许多小型模型。

我正在想象一个工作流程,看起来像:

  1. 按键分组数据
  2. 将整组数据发送到单个工人机器,
  3. 在同一台机器上为组训练模型。
  4. 报告或保存受过训练的模型。

我需要一些关于如何实现这一点的指导。模型训练是令人尴尬的并行性。

EN

回答 2

Software Engineering用户

回答已采纳

发布于 2018-09-25 16:54:16

下面是一个MCVE示例,说明我是如何实现这个的。这完全是在PySpark V2.1.1中运行的,它借助scikit--在一些严格的假设下学习(见要求2)。

要求:

  1. 每台工人机器上都安装了scikit学习。
  2. 对于单个模型,所有数据和培训开销都可以安装在一台工作机器上。

一般工作流程是:

  1. 将DataFrame转换为RDD(K、V),其中键是组ID,值是单独的数据观察。
  2. 使用groupByKey对单个工作机器的单个键的所有数据进行洗牌
  3. 为工人机器上的每一把钥匙训练模型
  4. 收集经过训练的模型,以便序列化,以便以后检索。

对于我的应用程序,这种方法在一个大型星火库集群上运行不到几分钟。我正在训练数以万计的模特。

代码语言:javascript
复制
import numpy as np
from pyspark.sql import SparkSession
from sklearn.mixture import GaussianMixture

##
# Make the example PySpark DataFrame
##

# Generate example data
nsamps = 500
cv1_1 = np.array([[1,0],[0,1]])
cv1_2 = np.array([[2,0],[0,0.5]])
cv2_1 = np.array([[2, -1.5,],[-1.5, 2]])
cv2_2 = np.array([[2, 1.5,],[1.5, 2]])

mu1_1 = np.array([0,0])
mu1_2 = np.array([0,3])

mu2_1 = mu1_1 + np.array([5,5])
mu2_2 = mu2_1 + np.array([5,5])

# Group 1 data
x1_1 = np.matmul(np.random.randn(nsamps,2), cv1_1) + mu1_1
x1_2 = np.matmul(np.random.randn(nsamps,2), cv1_2) + mu1_2
X1 = np.concatenate([x1_1, x1_2])

# Group 2 data
# X2 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_1 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_2 = np.matmul(np.random.randn(nsamps,2), cv2_2) + mu2_2
X2 = np.concatenate([x2_1, x2_2])

# Group lables
labs = 2*nsamps*["a"] + 2*nsamps*["b"]

# Create the data frame
X = np.concatenate([X1, X2]).tolist()
dat = [(i, x[0], x[1]) for (i, x) in zip(labs, X)]
cols = ["id", "x", "y"]
df = spark.createDataFrame(dat, cols)

##
# Shuffle groups to individual workers and train models
##

# group by ids
kv = df.rdd.map(lambda r: (r.id, [r.x, r.y]))
# create a distrributed RDD where each group is localized on a single worker node
groups = kv.groupByKey()
# a single group is a tuple of id and an iterable with the data
# e.g. (u'a', <pyspark.resultiterable.ResultIterable at 0x7effd7debb90>)

# helper function to train GMMs on the data iterables
def trainGMM(data_itr):
    # Returns a trained GMM
    X = np.array(data_itr.data).astype(np.float64)
    gmm = GaussianMixture(n_components=2, covariance_type='full', tol=0.001, reg_covar=1e-06, max_iter=100,
                          n_init=1, init_params='kmeans', weights_init=None, means_init=None, precisions_init=None,
                          random_state=None, warm_start=False, verbose=0, verbose_interval=10)
    gmm.fit(X)
    return gmm

# Train GMMs
gmms = groups.mapValues(trainGMM)  # still just a transformation

##
# Collect and Serialize GMMs
##

# the trained models are small, so we can collect to a single machine
collected_gmms = gmms.collect()

# pickle models for restoring later
outRoot = "local/output/dir/"

for tup in collected_gmms:
    id = tup[0]
    gmm = tup[1]
    with open("%s/%s_gmm.pkl" % (outRoot, id), 'w') as fout:
        pickle.dump(gmm, fout)
票数 0
EN

Software Engineering用户

发布于 2018-09-18 11:57:12

没有特定的正确或错误的方法来做到这一点,因为这取决于您的项目,以及您是否能够利用您的数据结构来提高效率。

例如,对于一次性项目,您可以在所有服务器上安装必要的软件,准备工作包,为所有服务器准备SSH登录,然后使用GNU并行处理所有服务器忙于处理工作数据包。这特别适合于一种临时方法,在这种方法中,输入数据和输出模型存储为普通文件,如果您对命令行感到满意的话。

如果希望定期和自动地培训新模型,最好创建一个工作项队列,即包含所有工作项和结果的共享数据库。然后使用一些管理软件在集群的所有节点上部署自定义编写的工作服务器软件。此工作服务器等待队列中的工作数据包并将结果写入数据库。这甚至可以与一些巧妙的自动缩放相结合,以使工人的数量适应待定工作量,但对于一个简单的项目来说,这可能是过分的。

在任何一种情况下:

  1. 从编写一个简单的工作软件开始,它可以在本地训练模型。
  2. 扩展软件,以便本地计算机上的多个工作人员可以并行训练--不要使用多个线程。这可能涉及一个数据库或类似GNU并行的软件来同步工作人员。
  3. 找到一种方法来运行那些分布在多台计算机上的员工。您的辅助软件已经能够做到这一点,这一步主要是一个sysadmin (“操作”)问题。
票数 2
EN
页面原文内容由Software Engineering提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://softwareengineering.stackexchange.com/questions/378585

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档