我创建了一个函数来根据列的Seq来检查数据文件中是否有重复的值。
我想实现一个"ignoreNulls",作为布尔参数传递到函数中。
如果为
。
我不知道我怎么能这样做。我应该使用if还是case?有一些表达式可以忽略partitionBy语句上的空值吗?
有人能帮我吗?
这是当前的函数
def checkRepeatedKey(newColName: String, keys: Seq[String])(dataframe: DataFrame): DataFrame = {
val repeatedCondition = $"sum" > 1
val windowCondition = Window.partitionBy(keys.head, keysToCheck.tail: _*)
dataframe
.withColumn("count", lit(1))
.withColumn("sum", sum("count").over(windowCondition))
.withColumn(newColName, repeatedCondition)
.drop("count", "sum")
}一些测试数据
val testDF = Seq(
("1", Some("name-1")),
("2", Some("repeated-name")),
("3", Some("repeated-name")),
("4", Some("name-4")),
("5", None),
("6", None)
).toDF("name_key", "name")测试功能
val results = testDF.transform(checkRepeatedKey("has_repeated_name", Seq("name"))
输出(没有ignoreNulls实现)
+--------+---------------+--------------------+
|name_key| name | has_repeated_name |
+--------+---------------+--------------------+
| 1 | name-1 | false |
+--------+---------------+--------------------+
| 2 | repeated-name | true |
+--------+---------------+--------------------+
| 3 | repeated-name | true |
+--------+---------------+--------------------+
| 4 | name-4 | false |
+--------+---------------+--------------------+
| 5 | null | true |
+--------+---------------+--------------------+
| 6 | null | true |
+--------+---------------+--------------------+对于ignoreNulls=true实现,应该是这样的
-- function header with ignoreNulls parameter
def checkRepeatedKey(newColName: String, keys: Seq[String], ignoreNulls: Boolean)(dataframe: DataFrame): DataFrame =
-- using the function, passing true for ignoreNulls
testDF.transform(checkRepeatedKey("has_repeated_name", Seq("name"), true)
-- expected output for nulls
+--------+---------------+--------------------+
| 5 | null | false |
+--------+---------------+--------------------+
| 6 | null | false |
+--------+---------------+--------------------+发布于 2022-10-29 10:33:51
首先,您应该正确地定义逻辑,以防keys中只有一部分列为空--是将其计算为空值,还是仅当keys中的所有列都为空时才定义空值?
为了简单起见,让我们假设keys中只有一个列(您可以很容易地扩展多列的逻辑)。您只需在您的if函数中添加一个简单的checkRepeatedKey:
def checkIfNullValue(keys: Seq[String]): Column = {
// for the sake of simplicity checking only the first key
col(keys.head).isNull
}
def checkRepeatedKey(newColName: String, keys: Seq[String], ignoreNulls: Boolean)(dataframe: DataFrame): DataFrame = {
...
...
val df = dataframe
.withColumn("count", lit(1))
.withColumn("sum", sum("count").over(windowCondition))
.withColumn(newColName, repeatedCondition)
.drop("count", "sum")
if (ignoreNulls)
df.withColumn(newColName, when(checkIfNullValue(keys), df(newColName)).otherwise(lit(false))
else df
}https://stackoverflow.com/questions/74226015
复制相似问题