我想要实现的相当简单:我想检查所有的ID(Uuid)是否经历了某种“状态”(行为状态)。如果是,则将与该ID关联的所有记录返回给我。例如,如果下面的某个ID的状态为"three“,我希望保留与该ID关联的所有记录。到目前为止,我可以通过以下两种方法来实现:
// first method
val dfList = df.filter($"status" === "three").select($"id").distinct.map(_.getString(0)).collect.toList
val dfTransformedOne = df.filter($"id".isin(dfList:_*))
// second method
val dfIds = df.filter($"status" === "three").select($"id").distinct
val dfTransformedTwo = df.join(broadcast(dfIds), Seq("id"))以上两种方法可以很好地处理我正在处理的样本数据,但是当我开始增加要处理的数据量时,我会遇到一些性能问题,因为我可能有数百万到数亿个ID需要过滤。有没有更有效的方法来做上面的事情,或者应该只是增加我正在使用的硬件?
下面是示例数据和预期输出。
val df = Seq(
("1234", "one"),
("1234", "two"),
("1234", "three"),
("234", "one"),
("234", "one"),
("234", "two")
).toDF("id", "status")
df.show
+----+------+
| id|status|
+----+------+
|1234| one|
|1234| two|
|1234| three|
| 234| one|
| 234| one|
| 234| two|
+----+------+
dfTransformed.show()
+----+------+
| id|status|
+----+------+
|1234| one|
|1234| two|
|1234| three|
+----+------+发布于 2019-04-19 02:33:51
在过滤之前进行分组和聚合将引入一种混洗,同时消除了将大列表收集到驱动程序的需要。它是否更快取决于您的数据分布、集群大小和网络连接。不过,这可能值得一试:
val df = Seq(
("1234", "one"),
("1234", "two"),
("1234", "three"),
("234", "one"),
("234", "one"),
("234", "two")
).toDF("id", "status")
df.groupBy("id")
.agg(collect_list("status").as("statuses"))
.filter(array_contains($"statuses", "three"))
.withColumn("status", explode($"statuses"))
.select("id", "status")
.show(false)给出预期的结果:
+----+------+
|id |status|
+----+------+
|1234|one |
|1234|two |
|1234|three |
+----+------+https://stackoverflow.com/questions/55749823
复制相似问题