我试着用scala实现k均值方法。我创建了一个类似的RDD
val df = sc.parallelize(data).groupByKey().collect().map((chunk)=> {
sc.parallelize(chunk._2.toSeq).toDF()
})
val examples = df.map(dataframe =>{
dataframe.selectExpr(
"avg(time) as avg_time",
"variance(size) as var_size",
"variance(time) as var_time",
"count(size) as examples"
).rdd
})
val rdd_final=examples.reduce(_ union _)
val kmeans= new KMeans()
val model = kmeans.run(rdd_final)使用此代码,我将获得一个错误
type mismatch;
[error] found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error] required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]所以我试着去做:
val rdd_final_Vector = rdd_final.map{x:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)}
val model = kmeans.run(rdd_final_Vector)但是我得到了一个错误:
java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector所以我在找一种方法去做那个角色,但是我找不到任何方法。
有什么想法吗?
诚挚的问候
发布于 2016-05-29 06:36:31
这里至少有几个问题:
Spark SQL理解的潜在不同类型的集合。Vector不是本机spark类型。KMeans实现的内容不匹配: SQL正在执行聚合。但是KMeans需要一系列单独的数据点的形式,一个向量(封装一个Array[Double])。那么--你为什么要向sum和average提供KMeans操作呢?仅在这里寻址#1 :您需要按以下方式做一些事情:
val doubVals = <rows rdd>.map{ row => row.getDouble("colname") }
val vector = Vectors.toDense{ doubVals.collect}然后,您有一个正确封装的Array[Double] (在向量中),可以提供给Kmeans。
https://stackoverflow.com/questions/37488722
复制相似问题