我正在尝试使用scala将带有数组的数据集转换为带有标签和向量的数据集,然后再将其放入机器学习中。
到目前为止,我成功地添加了一个双标签,但是我阻止了向量部分。下面是创建向量的代码:
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.sql.types.{DataTypes, StructField}
import org.apache.spark.sql.{Dataset, Row, _}
import spark.implicits._
def toVectors(withLabelDs: Dataset[Row]) = {
val allLabel = withLabelDs.count()
var countLabel = 0
val newDataset: Dataset[Row] = withLabelDs.map((line: Row) => {
println("schema line {}", line.schema)
//StructType(
// StructField(label,DoubleType,false),
// StructField(code,ArrayType(IntegerType,true),true),
// StructField(score,ArrayType(IntegerType,true),true))
val label = line.getDouble(0)
val indicesList = line.getList(1)
val indicesSize = indicesList.size
val indices = new Array[Int](indicesSize)
val valuesList = line.getList(2)
val values = new Array[Double](indicesSize)
var i = 0
while ( {
i < indicesSize
}) {
indices(i) = indicesList.get(i).asInstanceOf[Int] - 1
values(i) = valuesList.get(i).asInstanceOf[Int].toDouble
i += 1
}
var r: Row = null
try {
r = Row(label, Vectors.sparse(195, indices, values))
countLabel += 1
}
catch {
case e: IllegalArgumentException =>
println("something went wrong with label {} / indices {} / values {}", label, indices, values)
println("", e)
}
println("Still {} labels to process", allLabel - countLabel)
r
})
newDataset
}使用此代码,我得到了以下错误:
Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
val newDataset: Dataset[Row] = withLabelDs.map((line: Row) => {所以很自然,我改变了我的代码
def toVectors(withLabelDs: Dataset[Row]) = {
...
}, Encoders.bean(Row.getClass))
newDataset
}但我发现了一个错误:
error: overloaded method value map with alternatives:
[U](func: org.apache.spark.api.java.function.MapFunction[org.apache.spark.sql.Row,U],
encoder: org.apache.spark.sql.Encoder[U])org.apache.spark.sql.Dataset[U]
<and>
[U](func: org.apache.spark.sql.Row => U)
(implicit evidence$6: org.apache.spark.sql.Encoder[U])org.apache.spark.sql.Dataset[U]
cannot be applied to (org.apache.spark.sql.Row => org.apache.spark.sql.Row, org.apache.spark.sql.Encoder[?0])
val newDataset: Dataset[Row] = withLabelDs.map((line: Row) => {我怎么才能把这事做好?还有datasetRow和向量一起回来了吗?
发布于 2017-07-31 19:56:48
有两件事:
.map的类型是(T => U)(implicit Encoder[U]) => Dataset[U],但看起来您是在调用它,好像是(T => U, implicit Encoder[U]) => Dataset[U],它们略有不同。不要使用.map(f, encoder),而要尝试.map(f)(encoder)。
而且,我怀疑Encoders.bean(Row.getClass)能否工作,因为Row不是一个bean。一些快速的googling出现了RowEncoder,它看起来应该可以工作,但是我找不到很多关于它的文档。
发布于 2017-07-31 13:19:00
不幸的是,错误信息非常糟糕。import spark.implicits._只在火花壳中是正确的.它实际上意味着导入<Spark Session object>.implicits._,spark恰好是火花壳中SparkSession对象的变量名。
您可以从数据集中访问SparkSession。
在方法的顶部,您可以添加导入
def toVectors(withLabelDs: Dataset[Row]) = {
val sparkSession = withLabelIDs.sparkSession
import sparkSession.implicits._
//rest of method codehttps://stackoverflow.com/questions/45414979
复制相似问题