我有两个成本计算Spark作业的实现,扩展如下trait
trait Cost {
val calculateCost: Column
def df: DataFrame
lazy val aggregate = df
.withColumn("cost", calculateCost)
}实施1
case class CurrentCost(df: DataFrame) extends Cost {
override val calculateCost = when(includeCost, $"c1" * $"c2").otherwise(lit(0))
}实施2使用不同的Column来计算其成本:
case class PreviousCost(df: DataFrame) extends Cost {
override val calculateCost = callSomeUdf($"c3", $"c4")
}两者的调用方式类似:
val result = CurrentCost(df).aggregate这将根据调用的实现生成运行时异常。
对于实施1:
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: CASE WHEN ((NOT dealerNameIsNull) AND (costType = CURRENT)) THEN (c1 * c2) ELSE 0 END)
- field (class: CostFeatures, name: calculateCost, type: class org.apache.spark.sql.Column)对于实施2:
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: UDF(c3, c4))从具体实现中抽象出公共功能的更好方法是什么?
发布于 2019-12-14 21:13:39
Raf,我不能用下面的代码重现序列化问题:
sealed trait Cost {
val calculateCost: Column
def df: DataFrame
def aggregate = df.withColumn("cost", calculateCost)
}
case class CurrentCost(df: DataFrame) extends Cost {
import df.sparkSession.implicits._
override val calculateCost = when($"includeCost", $"c1" * $"c2").otherwise(lit(0))
}
case class PreviousCost(df: DataFrame) extends Cost {
import df.sparkSession.implicits._
override val calculateCost = MyUdfs.myAwesomeUdf($"c1")
}
object MyUdfs{
val myAwesomeUdf = udf{i: Int => i * 1.1F}
}
object MyMain extends App {
val spark = SparkSession.builder().config(new SparkConf().setMaster("local[4]").setAppName("azeaz")).getOrCreate
import spark.implicits._
val costDF = Seq(
(1,1.2,true),
(2,1.1,true),
(3,1.1,false)
).toDF("c1","c2","includeCost")
CurrentCost(costDF).aggregate.show(false)
PreviousCost(costDF).aggregate.show(false)
}获取两个输出:
+---+---+-----------+----+
|c1 |c2 |includeCost|cost|
+---+---+-----------+----+
|1 |1.2|true |1.2 |
|2 |1.1|true |2.2 |
|3 |1.1|false |0.0 |
+---+---+-----------+----+
+---+---+-----------+---------+
|c1 |c2 |includeCost|cost |
+---+---+-----------+---------+
|1 |1.2|true |1.1 |
|2 |1.1|true |2.2 |
|3 |1.1|false |3.3000002|
+---+---+-----------+---------+我错过了什么吗?
https://stackoverflow.com/questions/59333388
复制相似问题