我试图注册一个Spark,以帮助我从一个表中转换一个XML字符串,但是我得到了以下异常。我能知道我错过了什么吗?我正在使用Scala2.12.10和Spark2.4.4。
package org.mt.experiments
import org.apache.spark.sql.SparkSession
import scala.xml.transform.{RewriteRule, RuleTransformer}
import scala.xml.{Elem, Node, NodeSeq}
object Launcher2 {
case class Student(name: String, books: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.enableHiveSupport
.getOrCreate
import spark.implicits._
spark.udf.register("cleanXML", (xmlDoc: Node) => {
new RuleTransformer(new RewriteRule {
override def transform(node: Node): NodeSeq = node match {
case e: Elem if e.label == "author" => NodeSeq.Empty
case node => node
}
}).transform(xmlDoc)
})
val andy = Student(
name = "Andy",
books = "<books><book><<title>Functional Programming in Scala</title><author>Paul Chiusano and Runar Bjarnason</author><year>2014-12-26</year></book><book><title>Real and Complex Analysis</title><author>Walter Rudin</author><year>2015-05-19</year></book></books>"
)
val studentDF = Seq(andy).toDF()
studentDF.createOrReplaceTempView("studentDetails")
val tokDF = spark.sql("SELECT name, cleanXML(books) as books FROM studentDetails")
tokDF.show(false)
}
}错误
Exception in thread "main" scala.MatchError: scala.xml.Node (of class scala.reflect.internal.Types$ClassNoArgsTypeRef)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:760)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:926)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:925)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:740)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:761)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:926)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:925)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:740)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:736)
at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:192)
at org.mt.experiments.Launcher2$.main(Launcher2.scala:22)
at org.mt.experiments.Launcher2.main(Launcher2.scala)发布于 2019-11-04 13:39:18
线程"main“scala.MatchError: scala.xml.Node (类为scala.xml.Node)中的异常
错误说明scala.xml.Node不支持作为输入或输出的UDF函数的一种类型。
根据RuleTransformer.transform的刻度
转换(.):SeqNode
因此,您希望注册为cleanXML用户定义函数的以下代码返回不受支持的类型,从而返回异常。
(xmlDoc: Node) => {
new RuleTransformer(new RewriteRule {
override def transform(node: Node): NodeSeq = node match {
case e: Elem if e.label == "author" => NodeSeq.Empty
case node => node
}
}).transform(xmlDoc)
}您可以简单地使用.headOption.map(_.toString).getOrElse(...)或类似的方法来解决它。
https://stackoverflow.com/questions/58645708
复制相似问题