我有一个带有行Seq[(String, String, String)]的星星之火DF。我试着用它做一些flatMap,但是我做的任何事情最后都会抛出
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema不能转换为scala.Tuple3
我可以从DF中提取一行或多行。
df.map{ r => r.getSeq[Feature](1)}.first返回
Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....RDD的数据类型似乎是正确的。
org.apache.spark.rdd.RDD[Seq[(String, String, String)]]
df的架构是
root
|-- article_id: long (nullable = true)
|-- content_processed: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- lemma: string (nullable = true)
| | |-- pos_tag: string (nullable = true)
| | |-- ne_tag: string (nullable = true)我知道这个问题与sql有关,将RDD行作为org.apache.spark.sql.Row来处理,尽管它们愚蠢地说这是一个Seq[(String, String, String)]。有一个相关的问题(链接下面),但这个问题的答案不适用于我。我对火花也不太熟悉,不知道如何把它变成一个可行的解决办法。
行是Row[Seq[(String, String, String)]]、Row[(String, String, String)]、Seq[Row[(String, String, String)]]还是更疯狂的东西。
我想做的事情是
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)它看起来很有效,但实际上
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first抛出上述错误。那么,我应该如何(例如)在每一行中获得第二个元组的第一个元素呢?
此外,为什么一直被设计为这样做,声称某物是一种类型,而实际上它不是并且不能转换为声称的类型,这似乎是愚蠢的。
相关问题:GenericRowWithSchema异常将ArrayBuffer转换为HashSet在DataFrame中从Hive表转换为RDD
相关错误报告:http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type
发布于 2016-05-31 18:38:38
它并不是说它是一个元组。它声称它是一个映射到struct的Row。
import org.apache.spark.sql.Row
case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])
val df = Seq(
Record(1L, Seq(
Feature("ancient", "jj", "o"),
Feature("olympia_greece", "nn", "location")
))
).toDF
val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))您将在火花SQL编程指南中找到精确的映射规则。
由于Row并不是非常漂亮的结构,所以您可能希望将它映射到一些有用的东西:
content.map(_.map {
case Row(lemma: String, pos_tag: String, ne_tag: String) =>
(lemma, pos_tag, ne_tag)
})或者:
content.map(_.map ( row => (
row.getAs[String]("lemma"),
row.getAs[String]("pos_tag"),
row.getAs[String]("ne_tag")
)))最后,使用Datasets提供一种更简洁的方法
df.as[Record].rdd.map(_.content_processed)或
df.select($"content_processed").as[Seq[(String, String, String)]]虽然这在这个时候似乎有点小问题。
第一种方法(Row.getAs)和第二种方法(Dataset.as)有着重要的区别。前者将对象提取为Any并应用asInstanceOf。后一种方法是使用编码器在内部类型和所需表示之间进行转换。
发布于 2019-01-16 21:14:18
object ListSerdeTest extends App {
implicit val spark: SparkSession = SparkSession
.builder
.master("local[2]")
.getOrCreate()
import spark.implicits._
val myDS = spark.createDataset(
Seq(
MyCaseClass(mylist = Array(("asd", "aa"), ("dd", "ee")))
)
)
myDS.toDF().printSchema()
myDS.toDF().foreach(
row => {
row.getSeq[Row](row.fieldIndex("mylist"))
.foreach {
case Row(a, b) => println(a, b)
}
}
)
}
case class MyCaseClass (
mylist: Seq[(String, String)]
)以上代码是处理嵌套结构的另一种方法。星火默认编码器将编码TupleX,使其嵌套结构,这就是为什么您会看到这种奇怪的行为。就像其他人在评论中说的那样,您不能只做getAs[T](),因为它只是一个强制转换(x.asInstanceOf[T]),因此会给您运行时异常。
https://stackoverflow.com/questions/37553059
复制相似问题