我有一个spark数据帧,我需要将其转换为(key, value)对。格式如下:
+--------------------+--------------------+-------------------+------+------+
| cid| uid| date|rating| type|
+--------------------+--------------------+-------------------+------+------+
| 1111111111| user1-316|2019-10-11 14:01:49| 1|others|
| 1111111111| user1|2019-10-11 14:25:35| 2|mobile|
| 1111111111| user2|2019-10-11 14:30:05| 3|others|
| 1111111112| user2|2019-10-11 14:16:58| 4|others|
| 1111111113| user2|2019-10-11 14:32:00| 1|mobile|
+--------------------+--------------------+-------------------+------+------+我需要基于uid聚合它,并为每个type创建一个cid、rating、date列表
uid | history
-----------+--------------------------------------------------------
user1-316 | {"others": [["1111111111", 1, "2019-10-11 14:01:49"]]}
user1 | {"mobile": [["1111111111", 2, "2019-10-11 14:25:35"]]}
user2 | {"others": [["1111111111", 3, "2019-10-11 14:30:05"],["1111111112", 4, "2019-10-11 14:16:58"]],"mobile":[["1111111113", 1, "2019-10-11 14:32:00"]]}在python中,我可以做到这一点,因为我们有dict格式。我们如何在scala中做到这一点。
发布于 2019-10-18 18:49:03
更新答案:
你可以试试这样的东西。我不确定Python中的dict,但是对于(键,值),Scala有map类型。
scala> df.show
+----------+---------+-------------------+------+------+
| cid| uid| date|rating| type|
+----------+---------+-------------------+------+------+
|1111111111|user1-316|2019-10-11 14:01:49| 1|others|
|1111111111| user1|2019-10-11 14:25:35| 2|mobile|
|1111111111| user2|2019-10-11 14:30:05| 3|others|
|1111111112| user2|2019-10-11 14:16:58| 4|others|
|1111111113| user2|2019-10-11 14:32:00| 1|mobile|
+----------+---------+-------------------+------+------+
scala> df.withColumn("col1",array("cid","rating","date"))
.groupBy("type","uid")
.agg(map(col("type"),collect_list("col1")).as("col2"))
.groupBy("uid")
.agg(collect_list(col("col2")).as("history"))
.show(false)
+---------+----------------------------------------------------------------------------------------------------------------------------------------------+
|uid |history |
+---------+----------------------------------------------------------------------------------------------------------------------------------------------+
|user1-316|[[others -> [[1111111111, 1, 2019-10-11 14:01:49]]]] |
|user1 |[[mobile -> [[1111111111, 2, 2019-10-11 14:25:35]]]] |
|user2 |[[others -> [[1111111111, 3, 2019-10-11 14:30:05], [1111111112, 4, 2019-10-11 14:16:58]]], [mobile -> [[1111111113, 1, 2019-10-11 14:32:00]]]]|
+---------+----------------------------------------------------------------------------------------------------------------------------------------------+发布于 2019-10-18 18:55:00
如上所述,我们可以在scala中使用密钥Python值对,但不能使用与->相似的表示
首先,读取数据
scala> val df = Seq((1111111111,"user1-316","2019-10-1114:01:49",1,"others"), (1111111111,"user1","2019-10-1114:25:35",2,"mobile"), (1111111111,"user2","2019-10-1114:30:05",3,"others"), (1111111112,"user2","2019-10-1114:16:58",4,"others"), (1111111113,"user2","2019-10-1114:32:00",1,"mobile")).toDF("cid","uid","date","rating","type")
df: org.apache.spark.sql.DataFrame = [cid: int, uid: string ... 3 more fields]
scala> df.show
+----------+---------+------------------+------+------+
| cid| uid| date|rating| type|
+----------+---------+------------------+------+------+
|1111111111|user1-316|2019-10-1114:01:49| 1|others|
|1111111111| user1|2019-10-1114:25:35| 2|mobile|
|1111111111| user2|2019-10-1114:30:05| 3|others|
|1111111112| user2|2019-10-1114:16:58| 4|others|
|1111111113| user2|2019-10-1114:32:00| 1|mobile|
+----------+---------+------------------+------+------+现在,我们将cid、rating、date转换为list
scala> val df1 = df.groupBy($"uid", $"type").agg(collect_list(array($"cid", $"rating", $"date")).as("aggNew"))
df1: org.apache.spark.sql.DataFrame = [uid: string, type: string ... 1 more field]
scala> df1.show(false)
+---------+------+--------------------------------------------------------------------------------------------------+
|uid |type |aggNew |
+---------+------+--------------------------------------------------------------------------------------------------+
|user1 |mobile|[WrappedArray(1111111111, 2, 2019-10-1114:25:35)] |
|user2 |mobile|[WrappedArray(1111111113, 1, 2019-10-1114:32:00)] |
|user1-316|others|[WrappedArray(1111111111, 1, 2019-10-1114:01:49)] |
|user2 |others|[WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)]|
+---------+------+--------------------------------------------------------------------------------------------------+最重要的是,在uid上应用groupBy以获得所需的(key, values)
scala> df1.groupBy($"uid").agg(collect_list(map($"type", $"aggNew"))).show(false)
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|uid |collect_list(map(type, aggNew)) |
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user1-316|[Map(others -> WrappedArray(WrappedArray(1111111111, 1, 2019-10-1114:01:49)))] |
|user1 |[Map(mobile -> WrappedArray(WrappedArray(1111111111, 2, 2019-10-1114:25:35)))] |
|user2 |[Map(mobile -> WrappedArray(WrappedArray(1111111113, 1, 2019-10-1114:32:00))), Map(others -> WrappedArray(WrappedArray(1111111111, 3, 2019-10-1114:30:05), WrappedArray(1111111112, 4, 2019-10-1114:16:58)))]|
+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+添加了模式
root
|-- uid: string (nullable = true)
|-- collect_list(map(type, aggNew)): array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: array (valueContainsNull = true)
| | | |-- element: array (containsNull = true)
| | | | |-- element: string (containsNull = true)https://stackoverflow.com/questions/58447486
复制相似问题