我只是列出了两种解决方案,我曾试图实现一个用例,将sure应用到某些列中,但我不知道为什么我的两个函数的行为方式都完全不同,即使我试图实现相同的功能。有人能解释一下内部的工作吗?这两种情况到底发生了什么?
职能1:
def transformColumns(df: DataFrame, transformationType: String, sanitizationList: List[Sanitization]): DataFrame = {
try {
sanitizationList.foldLeft(df) {
(outerAccumulator: DataFrame, sanitization: Sanitization) =>
val aes: TAlgorithm = new AES256(key, iv)
@transient lazy val udfFunction = udf(aes.decrypt(_)
sanitization.column.foldLeft(outerAccumulator: DataFrame) {
(innerAccumulator: DataFrame, elem: String) =>
innerAccumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
}
}
}职能2:
def transformColumns(df: DataFrame, columns: Map[Seq[String], TAlgorithm]): DataFrame = {
try {
columns.foldLeft(df) {
(accumulator: DataFrame, sanitization: (Seq[String], TAlgorithm)) =>
import org.apache.spark.sql.functions.udf
val aes: TAlgorithm = new AES256(key, iv)
@transient lazy val udfFunction = udf(aes.decrypt(_))
sanitization._1.foreach{
elem => accumulator.withColumn(elem, when(col(elem).isNotNull, udfFunction(col(elem))).otherwise(lit(null)))
}
accumulator
}
}在第二种情况下,没有一个列被转换,不知道为什么。
发布于 2021-10-08 09:09:59
第一个示例中的
( sanitization.column.foldLeft(outerAccumulator: DataFrame) { (innerAccumulator: DataFrame,elem: String) => innerAccumulator.withColumn(elem,null(Elem).isNotNull,udfFunction(col(Elem).otherwise(lit(Null))}}
foldLeft计算下一次迭代时在innerAccumulator:DataFrame中的最后一行(innerAccumulator.withColumn...)。。
第二个例子中的
sanitization._1.foreach{ elem => accumulator.withColumn(elem,null(Elem).isNotNull,udfFunction(elem(Elem).otherwise(lit(Null)}
DataFrame是不可变的,所以withColumn返回一个新的DataFrame。但是由于for_each返回Unit,由accumulator.withColumn创建的新DF将丢失。
https://stackoverflow.com/questions/69491816
复制相似问题