首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala :将Dataset[Row]映射到Dataset[Row]

Scala :将Dataset[Row]映射到Dataset[Row]
EN

Stack Overflow用户
提问于 2017-07-31 11:41:13
回答 2查看 3.1K关注 0票数 2

我正在尝试使用scala将带有数组的数据集转换为带有标签和向量的数据集,然后再将其放入机器学习中。

到目前为止,我成功地添加了一个双标签,但是我阻止了向量部分。下面是创建向量的代码:

代码语言:javascript
复制
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.sql.types.{DataTypes, StructField}
import org.apache.spark.sql.{Dataset, Row, _}
import spark.implicits._

def toVectors(withLabelDs: Dataset[Row]) = {
val allLabel = withLabelDs.count()
var countLabel = 0
val newDataset: Dataset[Row] = withLabelDs.map((line: Row) => {
  println("schema line {}", line.schema)
  //StructType(
  //      StructField(label,DoubleType,false),
  //      StructField(code,ArrayType(IntegerType,true),true),
  //      StructField(score,ArrayType(IntegerType,true),true))
  val label = line.getDouble(0)
  val indicesList = line.getList(1)
  val indicesSize = indicesList.size
  val indices = new Array[Int](indicesSize)
  val valuesList = line.getList(2)
  val values = new Array[Double](indicesSize)
  var i = 0
  while ( {
    i < indicesSize
  }) {
    indices(i) = indicesList.get(i).asInstanceOf[Int] - 1
    values(i) = valuesList.get(i).asInstanceOf[Int].toDouble
    i += 1
  }
  var r: Row = null
  try {
    r = Row(label, Vectors.sparse(195, indices, values))
    countLabel += 1
  }
  catch {
    case e: IllegalArgumentException =>
      println("something went wrong with label {} / indices {} / values {}", label, indices, values)
      println("", e)
  }
  println("Still {} labels to process", allLabel - countLabel)
  r
})
newDataset
}

使用此代码,我得到了以下错误:

代码语言:javascript
复制
Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.
       val newDataset: Dataset[Row] = withLabelDs.map((line: Row) => {

所以很自然,我改变了我的代码

代码语言:javascript
复制
def toVectors(withLabelDs: Dataset[Row]) = {
...
}, Encoders.bean(Row.getClass))
newDataset
}

但我发现了一个错误:

代码语言:javascript
复制
error: overloaded method value map with alternatives:
[U](func: org.apache.spark.api.java.function.MapFunction[org.apache.spark.sql.Row,U],
    encoder: org.apache.spark.sql.Encoder[U])org.apache.spark.sql.Dataset[U] 
<and>
[U](func: org.apache.spark.sql.Row => U)
    (implicit evidence$6: org.apache.spark.sql.Encoder[U])org.apache.spark.sql.Dataset[U]
    cannot be applied to (org.apache.spark.sql.Row => org.apache.spark.sql.Row, org.apache.spark.sql.Encoder[?0])
       val newDataset: Dataset[Row] = withLabelDs.map((line: Row) => {

我怎么才能把这事做好?还有datasetRow和向量一起回来了吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-07-31 19:56:48

有两件事:

.map的类型是(T => U)(implicit Encoder[U]) => Dataset[U],但看起来您是在调用它,好像是(T => U, implicit Encoder[U]) => Dataset[U],它们略有不同。不要使用.map(f, encoder),而要尝试.map(f)(encoder)

而且,我怀疑Encoders.bean(Row.getClass)能否工作,因为Row不是一个bean。一些快速的googling出现了RowEncoder,它看起来应该可以工作,但是我找不到很多关于它的文档。

票数 1
EN

Stack Overflow用户

发布于 2017-07-31 13:19:00

不幸的是,错误信息非常糟糕。import spark.implicits._只在火花壳中是正确的.它实际上意味着导入<Spark Session object>.implicits._spark恰好是火花壳中SparkSession对象的变量名。

您可以从数据集中访问SparkSession。

在方法的顶部,您可以添加导入

代码语言:javascript
复制
def toVectors(withLabelDs: Dataset[Row]) = {
  val sparkSession = withLabelIDs.sparkSession
  import sparkSession.implicits._
  //rest of method code
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45414979

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档