我有一个数据科学应用程序,涉及到训练数以万计的小个体高斯模型。所谓“小”,我的意思是,任何单独的模型都可以很容易地在我们的一个工人服务器上训练。事实上,我们应该能够在每一个节点上同时训练几个。
我一直在探索与Yarn一起使用Spark,但是Spark似乎真的是为了训练多台机器上的大型模型,而不是包含在单个机器上的许多小型模型。
我正在想象一个工作流程,看起来像:
我需要一些关于如何实现这一点的指导。模型训练是令人尴尬的并行性。
发布于 2018-09-25 16:54:16
下面是一个MCVE示例,说明我是如何实现这个的。这完全是在PySpark V2.1.1中运行的,它借助scikit--在一些严格的假设下学习(见要求2)。
要求:
一般工作流程是:
groupByKey对单个工作机器的单个键的所有数据进行洗牌对于我的应用程序,这种方法在一个大型星火库集群上运行不到几分钟。我正在训练数以万计的模特。
import numpy as np
from pyspark.sql import SparkSession
from sklearn.mixture import GaussianMixture
##
# Make the example PySpark DataFrame
##
# Generate example data
nsamps = 500
cv1_1 = np.array([[1,0],[0,1]])
cv1_2 = np.array([[2,0],[0,0.5]])
cv2_1 = np.array([[2, -1.5,],[-1.5, 2]])
cv2_2 = np.array([[2, 1.5,],[1.5, 2]])
mu1_1 = np.array([0,0])
mu1_2 = np.array([0,3])
mu2_1 = mu1_1 + np.array([5,5])
mu2_2 = mu2_1 + np.array([5,5])
# Group 1 data
x1_1 = np.matmul(np.random.randn(nsamps,2), cv1_1) + mu1_1
x1_2 = np.matmul(np.random.randn(nsamps,2), cv1_2) + mu1_2
X1 = np.concatenate([x1_1, x1_2])
# Group 2 data
# X2 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_1 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_2 = np.matmul(np.random.randn(nsamps,2), cv2_2) + mu2_2
X2 = np.concatenate([x2_1, x2_2])
# Group lables
labs = 2*nsamps*["a"] + 2*nsamps*["b"]
# Create the data frame
X = np.concatenate([X1, X2]).tolist()
dat = [(i, x[0], x[1]) for (i, x) in zip(labs, X)]
cols = ["id", "x", "y"]
df = spark.createDataFrame(dat, cols)
##
# Shuffle groups to individual workers and train models
##
# group by ids
kv = df.rdd.map(lambda r: (r.id, [r.x, r.y]))
# create a distrributed RDD where each group is localized on a single worker node
groups = kv.groupByKey()
# a single group is a tuple of id and an iterable with the data
# e.g. (u'a', <pyspark.resultiterable.ResultIterable at 0x7effd7debb90>)
# helper function to train GMMs on the data iterables
def trainGMM(data_itr):
# Returns a trained GMM
X = np.array(data_itr.data).astype(np.float64)
gmm = GaussianMixture(n_components=2, covariance_type='full', tol=0.001, reg_covar=1e-06, max_iter=100,
n_init=1, init_params='kmeans', weights_init=None, means_init=None, precisions_init=None,
random_state=None, warm_start=False, verbose=0, verbose_interval=10)
gmm.fit(X)
return gmm
# Train GMMs
gmms = groups.mapValues(trainGMM) # still just a transformation
##
# Collect and Serialize GMMs
##
# the trained models are small, so we can collect to a single machine
collected_gmms = gmms.collect()
# pickle models for restoring later
outRoot = "local/output/dir/"
for tup in collected_gmms:
id = tup[0]
gmm = tup[1]
with open("%s/%s_gmm.pkl" % (outRoot, id), 'w') as fout:
pickle.dump(gmm, fout)发布于 2018-09-18 11:57:12
没有特定的正确或错误的方法来做到这一点,因为这取决于您的项目,以及您是否能够利用您的数据结构来提高效率。
例如,对于一次性项目,您可以在所有服务器上安装必要的软件,准备工作包,为所有服务器准备SSH登录,然后使用GNU并行处理所有服务器忙于处理工作数据包。这特别适合于一种临时方法,在这种方法中,输入数据和输出模型存储为普通文件,如果您对命令行感到满意的话。
如果希望定期和自动地培训新模型,最好创建一个工作项队列,即包含所有工作项和结果的共享数据库。然后使用一些管理软件在集群的所有节点上部署自定义编写的工作服务器软件。此工作服务器等待队列中的工作数据包并将结果写入数据库。这甚至可以与一些巧妙的自动缩放相结合,以使工人的数量适应待定工作量,但对于一个简单的项目来说,这可能是过分的。
在任何一种情况下:
https://softwareengineering.stackexchange.com/questions/378585
复制相似问题