我有一个类似于下面的表格:
| id | item |
| -- | ------------------------------------ |
| 1 | {order_id: 1, item_id: 1, price: 10} |
| 2 | {order_id: 1, item_id: 2, price: 11} |
| 3 | {order_id: 2, item_id: 3, price: 12} |
| 4 | {order_id: 2, item_id: 4, price: 13} |我需要将表中的行聚合到以下内容中:
| order_id | order |
| -------- | ------------------------------------------------------------------------ |
| 1 | {order_id: 1, items: [{item_id: 1, price: 10}, {item_id: 2, price: 11}]} |
| 2 | {order_id: 2, items: [{item_id: 3, price: 12}, {item_id: 4, price: 13}]} |最初我认为UDAF可以做到这一点,但当我实现一个聚合器UDAF函数时,我不确定在merge方法中返回什么,因为如果order id不同,它们就不能被合并。
发布于 2021-09-17 01:29:30
假设模型如下:
case class Order(order_id: Int, items: Seq[Item])
case class Item(item_id: Int, price: Double)
case class Line(item: Item)使用groupBy按item.order_id对行进行分组,然后收集以下项:
import sparkSession.implicits._
df.groupBy($"item.order_id")
.as[Int, Line]
.mapGroups { case (order_id, lines) =>
(order_id, Order(order_id, lines.toSeq.map(line => Item(line.item.item_id, line.item.price))))
}发布于 2021-09-17 07:16:31
从Spark1.6及更高版本开始,您不需要UDAF,您可以使用内置的SQL函数collect_list来聚合行对象
如果您表架构如下:
root
|-- id: integer (nullable = false)
|-- item: struct (nullable = true)
| |-- order_id: integer (nullable = true)
| |-- item_id: integer (nullable = true)
| |-- price: double (nullable = true)在dataframe中加载表之后,您的代码应该是:
import org.apache.spark.sql.functions.{collect_list, struct}
dataframe
.groupBy("item.order_id")
.agg(collect_list(struct("item.item_id", "item.price")).as("items"))
.withColumn("order", struct("order_id", "items"))
.drop("items")https://stackoverflow.com/questions/69216699
复制相似问题