首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在PySpark中将sql函数与UDAF组合/链接

如何在PySpark中将sql函数与UDAF组合/链接
EN

Stack Overflow用户
提问于 2019-11-15 11:47:10
回答 1查看 221关注 0票数 0

我正在尝试在PySpark中的Spark dataframe上使用一组预定义的sql函数以及我自己的UDAF。

代码语言:javascript
复制
    @F.udf
    def mode(v):
     from collections import Counter
     x = [w[0] for w in Counter(v).most_common(5)]
     return x

   funs = [mean, max, min, stddev, approxCountDistinct, mode]
   columns = df.columns
   expr = [f(col(c)) for f in funs for c in columns]

   s = df.agg(*expr).collect()

当我尝试将我的自定义函数与其他函数一起使用时,我得到: org.apache.spark.sql.AnalysisException: grouping expressions is empty。Wrap '(avg(CAST(DBN AS DOUBLE)作为窗口函数中的avg(DBN)或将'DBN‘包装在first() (或first_value)中(如果您不关心获得哪个值)。;;

但是当我运行的时候:

代码语言:javascript
复制
funs = [mode]
   columns = df.columns
   expr = [f(collect_list(col(c))) for f in funs for c in columns]

   s = df.agg(*expr).collect()

它给出了正确的结果,但仅适用于我的UDF,而不适用于其他函数。

有没有一种方法可以将collect_list函数组合到我的自定义函数中,这样我就可以和其他函数一起运行我的自定义函数。

EN

回答 1

Stack Overflow用户

发布于 2019-11-15 14:55:36

你得到这个错误是因为你在聚合函数中使用udf,而你本应该使用UDAF。1.您可以通过遵循How to define and use a User-Defined Aggregate Function in Spark SQL?定义您自己的UDAF,或者2.您可以人工进行汇总,然后传递给您的udf。当你想在调用你的udf之前使用collect_list时,你可以这样做:

代码语言:javascript
复制
@F.udf
    def mode(v):
     from collections import Counter
     x = [w[0] for w in Counter(v).most_common(5)]
     return x

funs = [mean, max, min, stddev, approxCountDistinct, mode]
my_funs = [mode]
expr = [f(collect_list(col(c))) if f in my_funs  else f(col(c)) for f in funs for c in columns]
s = df.agg(*expr).collect()

在上面的代码中,在对列调用udf之前,使用collect_list进行聚合。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58870057

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档