我有一个名为spark-dataframe freq的输入
+---------------+----+----+----+----+
|Main_CustomerID| A1| A2| A3| A4|
+---------------+----+----+----+----+
| 101|null| 2| 1|null|
| 102| 2|null| 2| 4|
| 103| 1| 2|null| 3|
| 104| 2|null| 3|null|
+---------------+----+----+----+----+如果整个dataframe的值为not null,则需要将该值替换为1。我用Scala做了这件事,就像
val cols = freq.columns.drop(1).toArray
var newfreq = freq
for (column <- cols) {
newfreq = newfreq.withColumn(column, when(col(column).isNotNull, 1).otherwise(col(column)))
}我得到了结果dataframe名为newfreq,
+---------------+----+----+----+----+
|Main_CustomerID| A1| A2| A3| A4|
+---------------+----+----+----+----+
| 101|null| 1| 1|null|
| 102| 1|null| 1| 1|
| 103| 1| 1|null| 1|
| 104| 1|null| 1|null|
+---------------+----+----+----+----+但是有没有办法用优化的解决方案来取代这个for loop呢?
发布于 2018-07-13 10:07:31
下面是另一种优化的方法:
import org.apache.spark.sql.functions._
val cols = freq.columns.drop(1).toSeq
val selections = Seq(col("id")) ++ cols.map(c => when(col(c).isNotNull, lit(1)).otherwise(col(c)).alias(c))
val freq2 = freq.select(selections : _*)
freq2.show
// +---+----+----+----+----+
// | id| a1| a2| a3| a4|
// +---+----+----+----+----+
// |101|null| 1| 1|null|
// |102| 1|null| 1| 1|
// |103| 1| 1|null| 1|
// |104| 1|null| 1|null|
// +---+----+----+----+----+您可以尝试比较这两种方案的执行计划:
scala> newfreq.explain(true)
== Parsed Logical Plan ==
'Project [id#10, a1#20, a2#26, a3#32, CASE WHEN isnotnull('a4) THEN 1 ELSE 'a4 END AS a4#38]
+- AnalysisBarrier
+- Project [id#10, a1#20, a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, a4#14]
+- Project [id#10, a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, a3#13, a4#14]
+- Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, a2#12, a3#13, a4#14]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Analyzed Logical Plan ==
id: int, a1: int, a2: int, a3: int, a4: int
Project [id#10, a1#20, a2#26, a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- Project [id#10, a1#20, a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, a4#14]
+- Project [id#10, a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, a3#13, a4#14]
+- Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, a2#12, a3#13, a4#14]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Optimized Logical Plan ==
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Physical Plan ==
*(1) Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- *(1) FileScan csv [id#10,a1#11,a2#12,a3#13,a4#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../test.data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,a1:int,a2:int,a3:int,a4:int>
scala> freq2.explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('id, None), CASE WHEN isnotnull('a1) THEN 1 ELSE 'a1 END AS a1#46, CASE WHEN isnotnull('a2) THEN 1 ELSE 'a2 END AS a2#47, CASE WHEN isnotnull('a3) THEN 1 ELSE 'a3 END AS a3#48, CASE WHEN isnotnull('a4) THEN 1 ELSE 'a4 END AS a4#49]
+- AnalysisBarrier
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Analyzed Logical Plan ==
id: int, a1: int, a2: int, a3: int, a4: int
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Optimized Logical Plan ==
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Physical Plan ==
*(1) Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- *(1) FileScan csv [id#10,a1#11,a2#12,a3#13,a4#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../test.data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,a1:int,a2:int,a3:int,a4:int>优化的逻辑计划对两者都是一样的,但这是一种更干净的方法。
发布于 2018-08-18 12:35:34
你试过<dataframe>.fillna(1)了吗?
如果您只想为每个coln创建特定的列或不同的值,您可以创建一个字典来这样做。
<Dataframe>.fillna({col1:1, col2:0})希望它能帮上忙
https://stackoverflow.com/questions/51322538
复制相似问题