我有一个下面的数据,其中有用户和主管关系。
user |supervisor |id
-----|-----------|----
a | b | 1
b | c | 2
c | d | 3
e | b | 4我想要分解用户和主管之间的关系层次结构,如下所示。
user |supervisor |id
-----|-----------|----
a | b | 1
a | c | 1
a | d | 1
b | c | 2
b | d | 2
c | d | 3
e | b | 4
e | c | 4
e | d | 4 正如您所看到的,对于用户'a',直接主管是'b‘,但'b’再次将'c‘作为其主管。所以间接地'c‘也是'a’的监督者,以此类推。例如,我的目标是为给定用户分解任意级别的层次结构。在spark-scala中实现这一点的最佳方式是什么?
发布于 2017-07-17 19:08:24
这是一种使用数据帧的方法。我展示了执行层次结构的一个级别,但可以通过重复以下步骤多次执行该操作:
val df = sc.parallelize(Array(("a", "b", 1), ("b", "c", 2), ("c", "d", 3), ("e", "b", 4))).toDF("user", "supervisor", "id")
scala> df.show
+----+----------+---+
|user|supervisor| id|
+----+----------+---+
| a| b| 1|
| b| c| 2|
| c| d| 3|
| e| b| 4|
+----+----------+---+让我们启用交叉连接:
spark.conf.set("spark.sql.crossJoin.enabled", true)然后连接同一个表:
val dfjoin = df.as("df1").join(df.as("df2"), $"df1.supervisor" === $"df2.user", "left").select($"df1.user", $"df1.supervisor".as("s1"), $"df1.id", $"df2.supervisor".as("s2"))我使用udf将两列组合成一个数组:
import org.apache.spark.sql.functions.udf
val combineUdf = udf((x: String, y: String) => Seq(x, y))
val dfcombined = dfjoin.withColumn("combined", combineUdf($"s1", $"s2")).select($"user", $"combined", $"id")然后,最后一步是将数组展平以分离行,并过滤未连接的行:
val dfexplode = dfcombined.withColumn("supervisor", explode($"combined")).select($"user", $"id", $"supervisor").filter($"supervisor".isNotNull)第一级层次结构如下所示:
scala> dfexplode.show
+----+---+----------+
|user| id|supervisor|
+----+---+----------+
| c| 3| d|
| b| 2| c|
| b| 2| d|
| a| 1| b|
| a| 1| c|
| e| 4| b|
| e| 4| c|
+----+---+----------+https://stackoverflow.com/questions/45137381
复制相似问题