首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Spark:特征调用实现在列上抛出java.io.NotSerializableException

Apache Spark:特征调用实现在列上抛出java.io.NotSerializableException
EN

Stack Overflow用户
提问于 2019-12-14 16:06:36
回答 1查看 103关注 0票数 0

我有两个成本计算Spark作业的实现,扩展如下trait

代码语言:javascript
复制
trait Cost {

  val calculateCost: Column

  def df: DataFrame

  lazy val aggregate = df
    .withColumn("cost", calculateCost)

}

实施1

代码语言:javascript
复制
case class CurrentCost(df: DataFrame) extends Cost {
  override val calculateCost = when(includeCost, $"c1" * $"c2").otherwise(lit(0))
}

实施2使用不同的Column来计算其成本:

代码语言:javascript
复制
case class PreviousCost(df: DataFrame) extends Cost {
  override val calculateCost = callSomeUdf($"c3", $"c4")
}

两者的调用方式类似:

代码语言:javascript
复制
val result = CurrentCost(df).aggregate

这将根据调用的实现生成运行时异常。

对于实施1:

代码语言:javascript
复制
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.Column, value: CASE WHEN ((NOT dealerNameIsNull) AND (costType = CURRENT)) THEN (c1 * c2) ELSE 0 END)
    - field (class: CostFeatures, name: calculateCost, type: class org.apache.spark.sql.Column)

对于实施2:

代码语言:javascript
复制
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.Column, value: UDF(c3, c4))

从具体实现中抽象出公共功能的更好方法是什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-12-14 21:13:39

Raf,我不能用下面的代码重现序列化问题:

代码语言:javascript
复制
sealed trait Cost {
  val calculateCost: Column
  def df: DataFrame
  def aggregate = df.withColumn("cost", calculateCost)
}

case class CurrentCost(df: DataFrame) extends Cost {
  import df.sparkSession.implicits._
  override val calculateCost = when($"includeCost", $"c1" * $"c2").otherwise(lit(0))
}
case class PreviousCost(df: DataFrame) extends Cost {
  import df.sparkSession.implicits._
  override val calculateCost = MyUdfs.myAwesomeUdf($"c1")
}

object MyUdfs{
  val myAwesomeUdf = udf{i: Int => i * 1.1F}
}

object MyMain extends App {

  val spark = SparkSession.builder().config(new SparkConf().setMaster("local[4]").setAppName("azeaz")).getOrCreate

  import spark.implicits._

  val costDF = Seq(
    (1,1.2,true),
    (2,1.1,true),
    (3,1.1,false)
  ).toDF("c1","c2","includeCost")

  CurrentCost(costDF).aggregate.show(false)
  PreviousCost(costDF).aggregate.show(false)

}

获取两个输出:

代码语言:javascript
复制
+---+---+-----------+----+
|c1 |c2 |includeCost|cost|
+---+---+-----------+----+
|1  |1.2|true       |1.2 |
|2  |1.1|true       |2.2 |
|3  |1.1|false      |0.0 |
+---+---+-----------+----+

+---+---+-----------+---------+
|c1 |c2 |includeCost|cost     |
+---+---+-----------+---------+
|1  |1.2|true       |1.1      |
|2  |1.1|true       |2.2      |
|3  |1.1|false      |3.3000002|
+---+---+-----------+---------+

我错过了什么吗?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59333388

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档