我有两个数据帧: dataDf和regexDf。dataDf有大量记录,而regexDf有两列正则表达式。我的问题是,我需要根据regexDef中的两列匹配正则表达式的两列来过滤dataDf。我想出了这个
dataDf.registerTempTable("dataTable")
sqlContext.udf.register("matchExpressionCombination", matchExpressionCombination _)
val matchingResults = sqlContext.sql("SELECT * FROM dataTable WHERE matchExpressionCombination(col1, col2)")
def matchExpressionCombination(col1Text: String, col2Text: String): Boolean = {
val regexDf = getRegexDf()
var isMatch = false
for(row <- regexDf.collect) {
if(col1Text.matches(row(0).toString) && col2Text.matches(row(1).toString)) {
isMatch = true
}
}
isMatch
}当我说
matchingResults.count().println我得到以下错误:-
Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#118L])
TungstenExchange SinglePartition
TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
Union
TungstenProject
Filter UDF(Col1Text#97,Col2Text#109)
Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
TungstenProject
Filter UDF(Col1Text#2,Col2Text#14)
Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1403)
at com.salesforce.hammer.clusterer.collector.DeduperTest$.applyBaselineAndUpgradeOnlyPatternTest(DeduperTest.scala:83)
at com.salesforce.hammer.clusterer.collector.Application$.main(Main.scala:53)
at com.salesforce.hammer.clusterer.collector.Application.main(Main.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenExchange SinglePartition
TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
Union
TungstenProject
Filter UDF(Col1Text#97,Col2Text#109)
Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
TungstenProject
Filter UDF(Col1Text#2,Col2Text#14)
Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:141)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 15 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
Union
TungstenProject
Filter UDF(Col1Text#97,Col2Text#109)
Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
TungstenProject
Filter UDF(Col1Text#2,Col22Text#14)
Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 23 more
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
at org.apache.spark.sql.execution.Filter.doExecute(basicOperators.scala:113)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:184)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:184)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:309)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:184)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 31 more
Caused by: java.io.NotSerializableException: com.salesforce.RegexDeduper
Serialization stack:
- object not serializable (class: com.salesforce.RegexDeduper, value: com.salesforce.RegexDeduper@67b464f4)
- field (class: com.salesforce.RegexDeduper$$anonfun$applyCol1TextAndCol2TextPattern$1, name: $outer, type: class com.salesforce.RegexDeduper)
- object (class com.salesforce.RegexDeduper$$anonfun$applyBaselineAndUpgradePattern$1, <function2>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, name: func$3, type: interface scala.Function2)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(Col1Text#97,Col2Text#109))
- field (class: org.apache.spark.sql.execution.Filter, name: condition, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.execution.Filter, Filter UDF(Col1Text#97,Col2Text#109)
Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2TText#109,UpgradedStacktrace#110,UpgradedTime#111]
)
- field (class: org.apache.spark.sql.execution.Filter$$anonfun$4, name: $outer, type: class org.apache.spark.sql.execution.Filter)
- object (class org.apache.spark.sql.execution.Filter$$anonfun$4, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)发布于 2015-12-16 14:39:17
不能在UDF中使用collect操作,因为所有数据都会被发送到该节点,并且collect应该仅用于在spark-shell类型的环境中进行实验,而不是在生产环境中。此外,您不能使用任何使用spark context的操作,因为这些操作是在驱动程序上执行的,但是UDF代码被发送到Executor节点,而Executor没有spark context对象
发布于 2015-10-14 16:27:54
dataTable中的每一行都将调用您的UDF matchExpressionCombination,但它涉及到收集一个RDD (regexDf.collect)。这将导致'dataTable`‘的每一行执行一次收集操作,这应该是非常低效的。
您应该加入RDDs,使用UDF函数来确定表的匹配位置,或者在UDF之外将regex RDD收集到本地val中,并在UDF中使用该val。
您的异常显示了Caused by: java.io.NotSerializableException: com.salesforce.RegexDeduper,因此您可能应该更详细地说明在代码中使用该类的位置。
https://stackoverflow.com/questions/33116842
复制相似问题