我正在寻找有关在Spark1.6ML库中实现并行LBFGS和OWLQN算法的文档。
我为1.6:http://spark.apache.org/docs/1.6.1/ml-advanced.html找到了这个页面,但没有任何关于并行化的内容
对于2.0:http://spark.apache.org/docs/2.0.0/ml-advanced.html,但仍然没有任何关于并行化的内容
最后,我阅读了代码link1。该方法
def train(dataset: DataFrame): LogisticRegressionModel似乎使用Breeze优化模型,但我找不到火花函数的调用位置(map、flatMap、reduce、.)。
在代码link2中,map用于计算被简化为计算梯度的子梯度.
谢谢
发布于 2016-08-01 20:38:07
总之,Spark使用了Breeze LBFGS和OWLQN优化算法,并为它们提供了一种在每次迭代时计算成本函数梯度的方法。
例如,Spark的LogisticRegression类使用了扩展Breeze的DiffFunction特性的LogisticCostFun类。这个代价函数类实现了具有签名的calculate抽象方法:
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double])计算方法使用了一个LogisticAggregator类,这是完成实际工作的地方。聚合类定义了两个重要的方法:
def add(instance: Instance): this.type // the gradient update equation is hard-coded here
def merge(other: LogisticAggregator): this.type // just adds other's gradient to the current gradientadd方法定义了在添加单个数据点后更新梯度的方法,合并方法定义了将两个单独的聚合器组合在一起的方法。这个类被传送到执行器,用于聚合每个数据分区,然后用于将所有分区聚合器组合成一个聚合器。最后一个聚合器实例保存当前迭代的累积梯度,并用于更新驱动节点上的系数。此过程由调用treeAggregate类中的LogisticCostFun来控制:
val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)
instances.treeAggregate(
new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean)
)(seqOp, combOp)
}您可以这样简单地想一想:微风实现了几种不同的优化方法(例如,LBFGS、OWLQN),并且只需要告诉优化方法如何计算梯度。火花告诉微风算法如何通过LogisticCostFun类计算梯度。LogisticCostFun只是说将一个LogisticAggregator实例发布到每个分区,收集梯度更新,然后将它们发送回驱动程序上。
https://stackoverflow.com/questions/38702412
复制相似问题