我想将几个UDF作为函数参数与数据帧一起传递。
这样做的一种方法可能是在函数中创建UDF,但这将创建和销毁UDF的几个实例,而不重用它--这可能不是解决此问题的最佳方法。
这是一个样本代码-
val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}
val df = inputDF1
.withColumn("new_col", lkpUDF(col("c1")))
val df2 = inputDF2.
.withColumn("new_col", lkpUDF(col("c1")))我不想做上面的事,我想做这样的事情-
val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}
def appendCols(df: DataFrame, lkpUDF: ?): DataFrame = {
df
.withColumn("new_col", lkpUDF(col("c1")))
}
val df = appendCols(inputDF, lkpUDF)上面的UDF非常简单,但在我的示例中,它可以返回原始类型或用户定义的案例类类型。任何想法或建议都将不胜感激。谢谢。
发布于 2017-02-08 22:10:19
您具有适当签名的功能需要如下:
import org.apache.spark.sql.UserDefinedFunction
def appendCols(df: DataFrame, func: UserDefinedFunction): DataFrame = {
df.withColumn("new_col", func(col("col1")))
}scala在返回初始化值的类型方面非常有用。
scala> val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}
lkpUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,List(IntegerType))此外,如果传递给udf包装器的函数的签名包含一个Any返回类型(如果该函数可以返回原语或用户定义的case类,情况就是这样),则UDF将无法编译,异常情况如下:
java.lang.UnsupportedOperationException: Schema for type Any is not supportedhttps://stackoverflow.com/questions/42123552
复制相似问题