首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花数据聚集过程中的筛选数组值

火花数据聚集过程中的筛选数组值
EN

Stack Overflow用户
提问于 2021-09-26 16:27:00
回答 1查看 488关注 0票数 2

我正在对以下数据进行聚合,以获得具有一系列品牌的广告商列表

代码语言:javascript
复制
+------------+------+
|advertiser  |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+

这是我的代码:

代码语言:javascript
复制
import org.apache.spark.sql.functions.collect_list

df2
  .groupBy("advertiser")
  .agg(collect_list("brand").as("brands"))

这给了我以下数据:

代码语言:javascript
复制
+------------+----------------+
|advertiser  |brands          |
+------------+----------------+
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
+------------+----------------+

在聚合过程中,我希望使用以下品牌列表筛选品牌列表:

代码语言:javascript
复制
+------+------------+
|brand |brand name  |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+

为了实现以下目标:

代码语言:javascript
复制
+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
+------------+--------+
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-26 20:59:33

对于您的问题,我看到了两种解决方案,我将调用Collect解决方案Join解决方案

收集溶液

如果您可以收集您的brands数据,那么在执行collect_list时,可以使用这个收集的集合来保持正确的品牌,然后flatten数组并用null替换空数组,如下所示:

代码语言:javascript
复制
import org.apache.spark.sql.functions.{array, col, collect_list, flatten, size, when}

val filteredBrands = brands.select("brand").collect().map(_.getString(0))

val finalDataframe = df2
  .groupBy("advertiser")
  .agg(collect_list(when(col("brand").isin(filteredBrands: _*), array(col("brand"))).otherwise(array())).as("brands"))
  .withColumn("brands", flatten(col("brands")))
  .withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))

连接解

如果您的brands dataframe不适合内存,您可以先离开“加入df2”和“brands”,以便在一个列中包含品牌(如果该品牌位于brands dataframe中,否则是null ),然后按组进行,最后替换由于广告商没有要通过null筛选的品牌而导致的空数组。

代码语言:javascript
复制
import org.apache.spark.sql.functions.{col, collect_list}

val finalDataframe = df2
  .join(brands.select(col("brand").as("filtered_brand")), col("filtered_brand") === col("brand"), "left_outer")
  .groupBy("advertiser").agg(collect_list(col("filtered_brand")).as("brands"))
  .withColumn("brands", when(size(col("brands")).equalTo(0), null).otherwise(col("brands")))

详细信息

因此,如果我们从一个df2数据文件开始,如下所示:

代码语言:javascript
复制
+------------+------+
|advertiser  |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+

以及如下所示的brands数据:

代码语言:javascript
复制
+------+------------+
|brand |brand name  |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+

df2brands dataframes之间的第一个左外部联接(第一行)之后,您将得到以下数据:

代码语言:javascript
复制
+------------+------+--------------+
|advertiser  |brand |filtered_brand|
+------------+------+--------------+
|Advertiser 1|Brand1|Brand1        |
|Advertiser 1|Brand2|null          |
|Advertiser 2|Brand3|Brand3        |
|Advertiser 2|Brand4|null          |
|Advertiser 3|Brand5|null          |
|Advertiser 3|Brand6|null          |
+------------+------+--------------+

当您将此数据按广告商分组,收集过滤品牌的列表时,您将得到以下数据:

代码语言:javascript
复制
+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|[]      |
|Advertiser 1|[Brand1]|
+------------+--------+

最后,当您应用最后一行将空数组替换为null时,您将得到预期的结果:

代码语言:javascript
复制
+------------+--------+
|advertiser  |brands  |
+------------+--------+
|Advertiser 2|[Brand3]|
|Advertiser 3|null    |
|Advertiser 1|[Brand1]|
+------------+--------+

结论

收集解决方案只创建一个昂贵的混乱步骤(在groupBy期间),如果brands数据较小,则应优先选择。如果您的 dataframe很大,那么连接解决方案就能工作,但是它会创建许多昂贵的混乱步骤,只有一个groupBy和一个groupBy。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69336822

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档