首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark DataFrame:多列上的多个聚合函数

Spark DataFrame:多列上的多个聚合函数
EN

Stack Overflow用户
提问于 2018-10-17 10:55:20
回答 1查看 2K关注 0票数 0

我有一个聚合函数列表,别名和其他JSON配置,如

代码语言:javascript
复制
{
    "aggregation": [{
        "alias_column_name1": {
            "sum": "<columnName1>"
        }
    }, {
        "alias_column_name2": {
            "sum": "<columnName1>"
        }
    }]
}

目前,我通过以下代码执行了相同的操作:

代码语言:javascript
复制
val col1:Column = sum(<dataframeName>(<columnName1>)).alias(<alias_column_name1>)
val col2:Column = sum(<dataframeName>(<columnName2>)).alias(<alias_column_name2>)
dataframe.groupby(..).agg(col1, col2)

但是我有很多聚合配置,我想在聚合方法中传递这类配置的列表,比如

代码语言:javascript
复制
val colList = List[Column](col1, col2)
dataframe.groupby(..).agg(colList)

我怎样才能做到这一点呢?谢谢

版本:

代码语言:javascript
复制
Scala : 2.11
Spark : 2.2.2
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.2.2"
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-10-17 11:18:20

列和函数的单独列表

假设您有一个函数列表:

代码语言:javascript
复制
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val funs: Seq[Column => Column] = Seq(sum _, min _, max _)

和列的列表

代码语言:javascript
复制
val cols: Seq[Column] = Seq($"y", $"z")

和一个数据集

代码语言:javascript
复制
val df = Seq((1, 2, 3), (1, 4, 5) ).toDF("x", "y", "z")

你可以把这两者结合起来

代码语言:javascript
复制
val exprs = for { c <- cols; f <- funs} yield f(c)

然后

代码语言:javascript
复制
df.groupBy($"x").agg(exprs.head, exprs.tail: _*)

在PySpark中也可以这样做:

代码语言:javascript
复制
from pyspark.sql import functions as F

funs = [F.sum, F.min, F.max]
cols = ["y", "z"]

df = spark.createDataFrame([(1, 2, 3), (1, 4, 5)], ("x", "y", "z"))

df.groupBy("x").agg(*[f(c) for c in cols for f in funs])

为每列预定义的操作列表

如果您想从预先定义的别名、列和函数集合开始,如您的问题中所示,那么可能更容易将其重构为

代码语言:javascript
复制
trait AggregationOp {
  def expr: Column
}

case class FuncAggregationOp(c: Column, func: Column => Column, alias: String
    ) extends AggregationOp {
  def expr = func(c).alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   FuncAggregationOp($"y", sum _, "alias_column_name1"),
   FuncAggregationOp($"z", sum _, "alias_column_name2")
)
val exprs = ops.map(_.expr)

df.groupBy($"x").agg(exprs.head, exprs.tail: _*)

您可以轻松地调整它以处理其他情况:

代码语言:javascript
复制
case class StringAggregationOp(c: String, func: String, alias: String
    ) extends AggregationOp {
  def expr = org.apache.spark.sql.functions.expr(s"${func}(`${c}`)").alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   StringAggregationOp("y", "sum", "alias_column_name1"),
   StringAggregationOp("z", "sum", "alias_column_name2")
)

Python的等价物可以是这样的:

代码语言:javascript
复制
from collections import namedtuple
from pyspark.sql import functions as F

class AggregationOp(namedtuple("Op", ["c", "func", "alias"])):
    def expr(self):
        if callable(self.func):
            return self.func(self.c).alias(self.alias)
        else:
            return F.expr("{func}(`{c}`)".format
                (func = self.func, c = self.c)).alias(self.alias)

ops = [
    AggregationOp("y", "sum", "alias_column_name1"),
    AggregationOp("z", "sum", "alias_column_name2")
]

 df.groupBy("x").agg(*[op.expr() for op in ops])

相关问题

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52853232

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档