我有一个如下的数据框架-
val myDF = Seq(
(1,"A",100),
(1,"E",300),
(1,"B",200),
(2,"A",200),
(2,"C",300),
(2,"D",100)
).toDF("id","channel","time")
myDF.show()
+---+-------+----+
| id|channel|time|
+---+-------+----+
| 1| A| 100|
| 1| E| 300|
| 1| B| 200|
| 2| A| 200|
| 2| C| 300|
| 2| D| 100|
+---+-------+----+对于每个id,我希望通道按time以升序方式排序。我想为这个逻辑实现一个UDAF。
我想把这个叫UDAF -
scala > spark.sql("""select customerid , myUDAF(customerid,channel,time) group by customerid """).show()输出数据帧应该是这样的-
+---+-------+
| id|channel|
+---+-------+
| 1|[A,B,E]|
| 2|[D,A,C]|
+---+-------+我正在尝试写一个UDAF,但无法实现它-
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class myUDAF extends UserDefinedAggregateFunction {
// This is the input fields for your aggregate function
override def inputSchema : org.apache.spark.sql.types.Structype =
Structype(
StructField("id" , IntegerType)
StructField("channel", StringType)
StructField("time", IntegerType) :: Nil
)
// This is the internal fields we would keep for computing the aggregate
// output
override def bufferSchema : Structype =
Structype(
StructField("Sequence", ArrayType(StringType)) :: Nil
)
// This is the output type of my aggregate function
override def dataType : DataType = ArrayType(StringType)
// no comments here
override def deterministic : Booelan = true
// initialize
override def initialize(buffer: MutableAggregationBuffer) : Unit = {
buffer(0) = Seq("")
}
}请帮帮忙。
发布于 2018-04-05 02:13:24
这就完成了(不需要定义您自己的UDF):
df.groupBy("id")
.agg(sort_array(collect_list( // NOTE: sort based on the first element of the struct
struct("time", "channel"))).as("stuff"))
.select("id", "stuff.channel")
.show(false)
+---+---------+
|id |channel |
+---+---------+
|1 |[A, B, E]|
|2 |[D, A, C]|
+---+---------+发布于 2018-04-05 02:10:43
我不会为此写一个UDAF。根据我的经验,UDAF相当慢,特别是复杂类型的时候。我将使用collect_list & UDF方法:
val sortByTime = udf((rws:Seq[Row]) => rws.sortBy(_.getInt(0)).map(_.getString(1)))
myDF
.groupBy($"id")
.agg(collect_list(struct($"time",$"channel")).as("channel"))
.withColumn("channel", sortByTime($"channel"))
.show()
+---+---------+
| id| channel|
+---+---------+
| 1|[A, B, E]|
| 2|[D, A, C]|
+---+---------+发布于 2018-04-05 02:55:20
一种没有UDF的简单方法。
import org.apache.spark.sql.functions._
myDF.orderBy($"time".asc).groupBy($"id").agg(collect_list($"channel") as "channel").show()https://stackoverflow.com/questions/49657498
复制相似问题