首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark SQL中的udf

Spark SQL中的udf
EN

Stack Overflow用户
提问于 2015-10-14 12:51:17
回答 2查看 7.6K关注 0票数 1

我有两个数据帧: dataDf和regexDf。dataDf有大量记录,而regexDf有两列正则表达式。我的问题是,我需要根据regexDef中的两列匹配正则表达式的两列来过滤dataDf。我想出了这个

代码语言:javascript
复制
dataDf.registerTempTable("dataTable")
sqlContext.udf.register("matchExpressionCombination", matchExpressionCombination _)

val matchingResults = sqlContext.sql("SELECT * FROM dataTable WHERE matchExpressionCombination(col1, col2)")
def matchExpressionCombination(col1Text: String, col2Text: String): Boolean = {
  val regexDf = getRegexDf()
  var isMatch = false
  for(row <- regexDf.collect) {
    if(col1Text.matches(row(0).toString) && col2Text.matches(row(1).toString)) {
      isMatch = true
    }
  }
  isMatch
}

当我说

代码语言:javascript
复制
matchingResults.count().println

​我得到以下错误:-

代码语言:javascript
复制
Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#118L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
   Union
    TungstenProject
     Filter UDF(Col1Text#97,Col2Text#109)
      Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
    TungstenProject
     Filter UDF(Col1Text#2,Col2Text#14)
      Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]


    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
    at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
    at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
    at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1403)
    at com.salesforce.hammer.clusterer.collector.DeduperTest$.applyBaselineAndUpgradeOnlyPatternTest(DeduperTest.scala:83)
    at com.salesforce.hammer.clusterer.collector.Application$.main(Main.scala:53)
    at com.salesforce.hammer.clusterer.collector.Application.main(Main.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenExchange SinglePartition
 TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
  Union
   TungstenProject
     Filter UDF(Col1Text#97,Col2Text#109)
      Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
    TungstenProject
     Filter UDF(Col1Text#2,Col2Text#14)
      Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:141)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
    ... 15 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
 Union

TungstenProject
     Filter UDF(Col1Text#97,Col2Text#109)
      Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
    TungstenProject
     Filter UDF(Col1Text#2,Col22Text#14)
      Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]


    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
    ... 23 more
Caused by: org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
    at org.apache.spark.sql.execution.Filter.doExecute(basicOperators.scala:113)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:184)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:184)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:309)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:184)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
    ... 31 more
Caused by: java.io.NotSerializableException: com.salesforce.RegexDeduper
Serialization stack:
    - object not serializable (class: com.salesforce.RegexDeduper, value: com.salesforce.RegexDeduper@67b464f4)
    - field (class: com.salesforce.RegexDeduper$$anonfun$applyCol1TextAndCol2TextPattern$1, name: $outer, type: class com.salesforce.RegexDeduper)
    - object (class com.salesforce.RegexDeduper$$anonfun$applyBaselineAndUpgradePattern$1, <function2>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, name: func$3, type: interface scala.Function2)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(Col1Text#97,Col2Text#109))
    - field (class: org.apache.spark.sql.execution.Filter, name: condition, type: class org.apache.spark.sql.catalyst.expressions.Expression)
    - object (class org.apache.spark.sql.execution.Filter, Filter UDF(Col1Text#97,Col2Text#109)
 Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2TText#109,UpgradedStacktrace#110,UpgradedTime#111]
)
    - field (class: org.apache.spark.sql.execution.Filter$$anonfun$4, name: $outer, type: class org.apache.spark.sql.execution.Filter)
    - object (class org.apache.spark.sql.execution.Filter$$anonfun$4, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
EN

回答 2

Stack Overflow用户

发布于 2015-12-16 14:39:17

不能在UDF中使用collect操作,因为所有数据都会被发送到该节点,并且collect应该仅用于在spark-shell类型的环境中进行实验,而不是在生产环境中。此外,您不能使用任何使用spark context的操作,因为这些操作是在驱动程序上执行的,但是UDF代码被发送到Executor节点,而Executor没有spark context对象

票数 3
EN

Stack Overflow用户

发布于 2015-10-14 16:27:54

dataTable中的每一行都将调用您的UDF matchExpressionCombination,但它涉及到收集一个RDD (regexDf.collect)。这将导致'dataTable`‘的每一行执行一次收集操作,这应该是非常低效的。

您应该加入RDDs,使用UDF函数来确定表的匹配位置,或者在UDF之外将regex RDD收集到本地val中,并在UDF中使用该val。

您的异常显示了Caused by: java.io.NotSerializableException: com.salesforce.RegexDeduper,因此您可能应该更详细地说明在代码中使用该类的位置。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33116842

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档