首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala-Spark: Filter DataFrame性能和优化

Scala-Spark: Filter DataFrame性能和优化
EN

Stack Overflow用户
提问于 2019-04-18 23:59:34
回答 1查看 523关注 0票数 1

我想要实现的相当简单:我想检查所有的ID(Uuid)是否经历了某种“状态”(行为状态)。如果是,则将与该ID关联的所有记录返回给我。例如,如果下面的某个ID的状态为"three“,我希望保留与该ID关联的所有记录。到目前为止,我可以通过以下两种方法来实现:

代码语言:javascript
复制
// first method
val dfList = df.filter($"status" === "three").select($"id").distinct.map(_.getString(0)).collect.toList
val dfTransformedOne = df.filter($"id".isin(dfList:_*))

// second method
val dfIds = df.filter($"status" === "three").select($"id").distinct
val dfTransformedTwo = df.join(broadcast(dfIds), Seq("id"))

以上两种方法可以很好地处理我正在处理的样本数据,但是当我开始增加要处理的数据量时,我会遇到一些性能问题,因为我可能有数百万到数亿个ID需要过滤。有没有更有效的方法来做上面的事情,或者应该只是增加我正在使用的硬件?

下面是示例数据和预期输出。

代码语言:javascript
复制
val df = Seq(
  ("1234", "one"), 
  ("1234", "two"), 
  ("1234", "three"), 
  ("234", "one"), 
  ("234", "one"), 
  ("234", "two")
  ).toDF("id", "status")

df.show
+----+------+
|  id|status|
+----+------+
|1234|   one|
|1234|   two|
|1234| three|
| 234|   one|
| 234|   one|
| 234|   two|
+----+------+

dfTransformed.show()
+----+------+
|  id|status|
+----+------+
|1234|   one|
|1234|   two|
|1234| three|
+----+------+
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-19 02:33:51

在过滤之前进行分组和聚合将引入一种混洗,同时消除了将大列表收集到驱动程序的需要。它是否更快取决于您的数据分布、集群大小和网络连接。不过,这可能值得一试:

代码语言:javascript
复制
val df = Seq(
  ("1234", "one"), 
  ("1234", "two"), 
  ("1234", "three"), 
  ("234", "one"), 
  ("234", "one"), 
  ("234", "two")
  ).toDF("id", "status")

df.groupBy("id")
  .agg(collect_list("status").as("statuses"))
  .filter(array_contains($"statuses", "three"))
  .withColumn("status", explode($"statuses"))
  .select("id", "status")
  .show(false)

给出预期的结果:

代码语言:javascript
复制
+----+------+
|id  |status|
+----+------+
|1234|one   |
|1234|two   |
|1234|three |
+----+------+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55749823

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档