我需要对流数据训练一个线性回归模型。我使用textFileStream读取流数据。但问题是RegressionMetrics接受RDD[(Double, Double)],而output的格式是DStream[Double,Double]。如何将output转换为RDD[(Double, Double)]以便能够使用RegressionMetrics
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.2)
.setNumIterations(25)
trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse)
testData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse)
model.trainOn(trainingData)
val output = model.predictOnValues(testData.map(lp => (lp.label, lp.features)))
val metrics = new RegressionMetrics(output)
val rmse = metrics.rootMeanSquaredError发布于 2016-04-28 16:43:13
每个DStream都包含一个底层的RDD (每个数据批次对应一个单独的RDD),可以使用foreachRDD方法进行访问:
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).foreachRDD { rdd =>
val metrics = new RegressionMetrics(rdd)
val rmse = metrics.rootMeanSquaredError
// do something with `rmse` here
}https://stackoverflow.com/questions/36893041
复制相似问题