首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从spark提取``Seq[(字符串,字符串,字符串)]

从spark提取``Seq[(字符串,字符串,字符串)]
EN

Stack Overflow用户
提问于 2016-05-31 18:25:48
回答 2查看 16.6K关注 0票数 17

我有一个带有行Seq[(String, String, String)]的星星之火DF。我试着用它做一些flatMap,但是我做的任何事情最后都会抛出

org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema不能转换为scala.Tuple3

我可以从DF中提取一行或多行。

代码语言:javascript
复制
df.map{ r => r.getSeq[Feature](1)}.first

返回

代码语言:javascript
复制
Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....

RDD的数据类型似乎是正确的。

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

df的架构是

代码语言:javascript
复制
root
 |-- article_id: long (nullable = true)
 |-- content_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lemma: string (nullable = true)
 |    |    |-- pos_tag: string (nullable = true)
 |    |    |-- ne_tag: string (nullable = true)

我知道这个问题与sql有关,将RDD行作为org.apache.spark.sql.Row来处理,尽管它们愚蠢地说这是一个Seq[(String, String, String)]。有一个相关的问题(链接下面),但这个问题的答案不适用于我。我对火花也不太熟悉,不知道如何把它变成一个可行的解决办法。

行是Row[Seq[(String, String, String)]]Row[(String, String, String)]Seq[Row[(String, String, String)]]还是更疯狂的东西。

我想做的事情是

代码语言:javascript
复制
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)

它看起来很有效,但实际上

代码语言:javascript
复制
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first

抛出上述错误。那么,我应该如何(例如)在每一行中获得第二个元组的第一个元素呢?

此外,为什么一直被设计为这样做,声称某物是一种类型,而实际上它不是并且不能转换为声称的类型,这似乎是愚蠢的。

相关问题:GenericRowWithSchema异常将ArrayBuffer转换为HashSet在DataFrame中从Hive表转换为RDD

相关错误报告:http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-05-31 18:38:38

它并不是说它是一个元组。它声称它是一个映射到structRow

代码语言:javascript
复制
import org.apache.spark.sql.Row

case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])

val df = Seq(
  Record(1L, Seq(
    Feature("ancient", "jj", "o"),
    Feature("olympia_greece", "nn", "location")
  ))
).toDF

val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))

您将在火花SQL编程指南中找到精确的映射规则。

由于Row并不是非常漂亮的结构,所以您可能希望将它映射到一些有用的东西:

代码语言:javascript
复制
content.map(_.map {
  case Row(lemma: String, pos_tag: String, ne_tag: String) => 
    (lemma, pos_tag, ne_tag)
})

或者:

代码语言:javascript
复制
content.map(_.map ( row => (
  row.getAs[String]("lemma"),
  row.getAs[String]("pos_tag"),
  row.getAs[String]("ne_tag")
)))

最后,使用Datasets提供一种更简洁的方法

代码语言:javascript
复制
df.as[Record].rdd.map(_.content_processed)

代码语言:javascript
复制
df.select($"content_processed").as[Seq[(String, String, String)]]

虽然这在这个时候似乎有点小问题。

第一种方法(Row.getAs)和第二种方法(Dataset.as)有着重要的区别。前者将对象提取为Any并应用asInstanceOf。后一种方法是使用编码器在内部类型和所需表示之间进行转换。

票数 20
EN

Stack Overflow用户

发布于 2019-01-16 21:14:18

代码语言:javascript
复制
object ListSerdeTest extends App {

  implicit val spark: SparkSession = SparkSession
    .builder
    .master("local[2]")
    .getOrCreate()


  import spark.implicits._
  val myDS = spark.createDataset(
    Seq(
      MyCaseClass(mylist = Array(("asd", "aa"), ("dd", "ee")))
    )
  )

  myDS.toDF().printSchema()

  myDS.toDF().foreach(
    row => {
      row.getSeq[Row](row.fieldIndex("mylist"))
        .foreach {
          case Row(a, b) => println(a, b)
        }
    }
  )
}

case class MyCaseClass (
                 mylist: Seq[(String, String)]
               )

以上代码是处理嵌套结构的另一种方法。星火默认编码器将编码TupleX,使其嵌套结构,这就是为什么您会看到这种奇怪的行为。就像其他人在评论中说的那样,您不能只做getAs[T](),因为它只是一个强制转换(x.asInstanceOf[T]),因此会给您运行时异常。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37553059

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档