我正在尝试在PySpark中的Spark dataframe上使用一组预定义的sql函数以及我自己的UDAF。
@F.udf
def mode(v):
from collections import Counter
x = [w[0] for w in Counter(v).most_common(5)]
return x
funs = [mean, max, min, stddev, approxCountDistinct, mode]
columns = df.columns
expr = [f(col(c)) for f in funs for c in columns]
s = df.agg(*expr).collect()当我尝试将我的自定义函数与其他函数一起使用时,我得到: org.apache.spark.sql.AnalysisException: grouping expressions is empty。Wrap '(avg(CAST(DBN AS DOUBLE)作为窗口函数中的avg(DBN)或将'DBN‘包装在first() (或first_value)中(如果您不关心获得哪个值)。;;
但是当我运行的时候:
funs = [mode]
columns = df.columns
expr = [f(collect_list(col(c))) for f in funs for c in columns]
s = df.agg(*expr).collect()它给出了正确的结果,但仅适用于我的UDF,而不适用于其他函数。
有没有一种方法可以将collect_list函数组合到我的自定义函数中,这样我就可以和其他函数一起运行我的自定义函数。
发布于 2019-11-15 14:55:36
你得到这个错误是因为你在聚合函数中使用udf,而你本应该使用UDAF。1.您可以通过遵循How to define and use a User-Defined Aggregate Function in Spark SQL?定义您自己的UDAF,或者2.您可以人工进行汇总,然后传递给您的udf。当你想在调用你的udf之前使用collect_list时,你可以这样做:
@F.udf
def mode(v):
from collections import Counter
x = [w[0] for w in Counter(v).most_common(5)]
return x
funs = [mean, max, min, stddev, approxCountDistinct, mode]
my_funs = [mode]
expr = [f(collect_list(col(c))) if f in my_funs else f(col(c)) for f in funs for c in columns]
s = df.agg(*expr).collect()在上面的代码中,在对列调用udf之前,使用collect_list进行聚合。
https://stackoverflow.com/questions/58870057
复制相似问题