首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Scala中编写Spark UDAF以返回数组类型作为输出

在Scala中编写Spark UDAF以返回数组类型作为输出
EN

Stack Overflow用户
提问于 2018-04-05 01:57:11
回答 3查看 628关注 0票数 1

我有一个如下的数据框架-

代码语言:javascript
复制
val myDF = Seq(
(1,"A",100),
(1,"E",300),
(1,"B",200),
(2,"A",200),
(2,"C",300),
(2,"D",100)
).toDF("id","channel","time")

myDF.show()

+---+-------+----+
| id|channel|time|
+---+-------+----+
|  1|      A| 100|
|  1|      E| 300|
|  1|      B| 200|
|  2|      A| 200|
|  2|      C| 300|
|  2|      D| 100|
+---+-------+----+

对于每个id,我希望通道按time以升序方式排序。我想为这个逻辑实现一个UDAF。

我想把这个叫UDAF -

代码语言:javascript
复制
scala > spark.sql("""select customerid , myUDAF(customerid,channel,time) group by customerid """).show()

输出数据帧应该是这样的-

代码语言:javascript
复制
+---+-------+
| id|channel|
+---+-------+
|  1|[A,B,E]|
|  2|[D,A,C]|
+---+-------+

我正在尝试写一个UDAF,但无法实现它-

代码语言:javascript
复制
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._



class myUDAF extends UserDefinedAggregateFunction {

    // This is the input fields for your aggregate function 
    override def inputSchema : org.apache.spark.sql.types.Structype = 
        Structype(
            StructField("id" , IntegerType)
            StructField("channel", StringType)
            StructField("time", IntegerType) :: Nil
        )

    // This is the internal fields we would keep for computing the aggregate 
    // output 
    override def bufferSchema : Structype = 
        Structype(
            StructField("Sequence", ArrayType(StringType)) :: Nil
        )

    // This is the output type of my aggregate function 
    override def dataType : DataType = ArrayType(StringType)

    // no comments here
    override def deterministic : Booelan = true 

    // initialize 
    override def initialize(buffer: MutableAggregationBuffer) : Unit = {
        buffer(0) = Seq("")
    }





}

请帮帮忙。

EN

回答 3

Stack Overflow用户

发布于 2018-04-05 02:13:24

这就完成了(不需要定义您自己的UDF):

代码语言:javascript
复制
df.groupBy("id")
  .agg(sort_array(collect_list(  // NOTE: sort based on the first element of the struct
         struct("time", "channel"))).as("stuff"))
  .select("id", "stuff.channel")
  .show(false)

+---+---------+
|id |channel  |
+---+---------+
|1  |[A, B, E]|
|2  |[D, A, C]|
+---+---------+
票数 3
EN

Stack Overflow用户

发布于 2018-04-05 02:10:43

我不会为此写一个UDAF。根据我的经验,UDAF相当慢,特别是复杂类型的时候。我将使用collect_list & UDF方法:

代码语言:javascript
复制
val sortByTime = udf((rws:Seq[Row]) => rws.sortBy(_.getInt(0)).map(_.getString(1)))

myDF
  .groupBy($"id")
  .agg(collect_list(struct($"time",$"channel")).as("channel"))
  .withColumn("channel", sortByTime($"channel"))
  .show()

+---+---------+
| id|  channel|
+---+---------+
|  1|[A, B, E]|
|  2|[D, A, C]|
+---+---------+
票数 2
EN

Stack Overflow用户

发布于 2018-04-05 02:55:20

一种没有UDF的简单方法。

代码语言:javascript
复制
import org.apache.spark.sql.functions._
myDF.orderBy($"time".asc).groupBy($"id").agg(collect_list($"channel") as "channel").show()
票数 -2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49657498

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档