首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用Scala优化方式将Spark数据的非空值替换为“1”

用Scala优化方式将Spark数据的非空值替换为“1”
EN

Stack Overflow用户
提问于 2018-07-13 09:57:36
回答 2查看 1.1K关注 0票数 0

我有一个名为spark-dataframe freq的输入

代码语言:javascript
复制
+---------------+----+----+----+----+
|Main_CustomerID|  A1|  A2|  A3|  A4|
+---------------+----+----+----+----+
|            101|null|   2|   1|null|
|            102|   2|null|   2|   4|
|            103|   1|   2|null|   3|
|            104|   2|null|   3|null|
+---------------+----+----+----+----+

如果整个dataframe的值为not null,则需要将该值替换为1。我用Scala做了这件事,就像

代码语言:javascript
复制
val cols = freq.columns.drop(1).toArray
var newfreq = freq
for (column <- cols) {
   newfreq = newfreq.withColumn(column, when(col(column).isNotNull, 1).otherwise(col(column)))
}

我得到了结果dataframe名为newfreq

代码语言:javascript
复制
+---------------+----+----+----+----+
|Main_CustomerID|  A1|  A2|  A3|  A4|
+---------------+----+----+----+----+
|            101|null|   1|   1|null|
|            102|   1|null|   1|   1|
|            103|   1|   1|null|   1|
|            104|   1|null|   1|null|
+---------------+----+----+----+----+

但是有没有办法用优化的解决方案来取代这个for loop呢?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-07-13 10:07:31

下面是另一种优化的方法:

代码语言:javascript
复制
import org.apache.spark.sql.functions._

val cols = freq.columns.drop(1).toSeq
val selections = Seq(col("id")) ++ cols.map(c => when(col(c).isNotNull, lit(1)).otherwise(col(c)).alias(c))
val freq2 = freq.select(selections : _*)
freq2.show
// +---+----+----+----+----+
// | id|  a1|  a2|  a3|  a4|
// +---+----+----+----+----+
// |101|null|   1|   1|null|
// |102|   1|null|   1|   1|
// |103|   1|   1|null|   1|
// |104|   1|null|   1|null|
// +---+----+----+----+----+

您可以尝试比较这两种方案的执行计划:

代码语言:javascript
复制
scala> newfreq.explain(true)
== Parsed Logical Plan ==
'Project [id#10, a1#20, a2#26, a3#32, CASE WHEN isnotnull('a4) THEN 1 ELSE 'a4 END AS a4#38]
+- AnalysisBarrier
      +- Project [id#10, a1#20, a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, a4#14]
         +- Project [id#10, a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, a3#13, a4#14]
            +- Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, a2#12, a3#13, a4#14]
               +- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Analyzed Logical Plan ==
id: int, a1: int, a2: int, a3: int, a4: int
Project [id#10, a1#20, a2#26, a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- Project [id#10, a1#20, a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, a4#14]
   +- Project [id#10, a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, a3#13, a4#14]
      +- Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, a2#12, a3#13, a4#14]
         +- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Optimized Logical Plan ==
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Physical Plan ==
*(1) Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- *(1) FileScan csv [id#10,a1#11,a2#12,a3#13,a4#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../test.data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,a1:int,a2:int,a3:int,a4:int>

scala> freq2.explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('id, None), CASE WHEN isnotnull('a1) THEN 1 ELSE 'a1 END AS a1#46, CASE WHEN isnotnull('a2) THEN 1 ELSE 'a2 END AS a2#47, CASE WHEN isnotnull('a3) THEN 1 ELSE 'a3 END AS a3#48, CASE WHEN isnotnull('a4) THEN 1 ELSE 'a4 END AS a4#49]
+- AnalysisBarrier
      +- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Analyzed Logical Plan ==
id: int, a1: int, a2: int, a3: int, a4: int
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Optimized Logical Plan ==
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Physical Plan ==
*(1) Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- *(1) FileScan csv [id#10,a1#11,a2#12,a3#13,a4#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../test.data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,a1:int,a2:int,a3:int,a4:int>

优化的逻辑计划对两者都是一样的,但这是一种更干净的方法。

票数 2
EN

Stack Overflow用户

发布于 2018-08-18 12:35:34

你试过<dataframe>.fillna(1)了吗?

如果您只想为每个coln创建特定的列或不同的值,您可以创建一个字典来这样做。

代码语言:javascript
复制
<Dataframe>.fillna({col1:1, col2:0})

希望它能帮上忙

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51322538

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档