首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >现场数据验证的火花数据法

现场数据验证的火花数据法
EN

Stack Overflow用户
提问于 2017-09-10 02:15:15
回答 1查看 10.1K关注 0票数 1

我有一堆列,样本类似我的数据显示如下所示。我需要检查列的错误,并必须生成两个输出文件。我正在使用ApacheSpark2.0,我想以一种高效的方式这样做。

代码语言:javascript
复制
Schema Details
---------------
EMPID - (NUMBER)
ENAME - (STRING,SIZE(50))
GENDER - (STRING,SIZE(1))

Data
----
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,MM
1015,123MYA,F

我的例外输出文件应该如下所示:

代码语言:javascript
复制
1.
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,NULL
1015,NULL,F

2.
EMPID,ERROR_COLUMN,ERROR_VALUE,ERROR_DESCRIPTION
1010,GENDER,"MM","OVERSIZED"
1010,GENDER,"MM","VALUE INVALID FOR GENDER"
1015,ENAME,"123MYA","NAME SHOULD BE A STRING"

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-10 10:14:37

我还没有真正使用Spark2.0,所以我将尝试在Spark1.6中用一个解决方案来回答您的问题。

代码语言:javascript
复制
 // Load you base data
val input  = <<you input dataframe>>

//Extract the schema of your base data
val originalSchema = input.schema

// Modify you existing schema with you additional metadata fields
val modifiedSchema= originalSchema.add("ERROR_COLUMN", StringType, true)
                                  .add("ERROR_VALUE", StringType, true)
                                  .add("ERROR_DESCRIPTION", StringType, true)

// write a custom validation function                                 
def validateColumns(row: Row): Row = {

var err_col: String = null
var err_val: String = null
var err_desc: String = null
val empId = row.getAs[String]("EMPID")
val ename = row.getAs[String]("ENAME")
val gender = row.getAs[String]("GENDER")

// do checking here and populate (err_col,err_val,err_desc) with values if applicable

Row.merge(row, Row(err_col),Row(err_val),Row(err_desc))
}

// Call you custom validation function
val validateDF = input.map { row => validateColumns(row) }  

// Reconstruct the DataFrame with additional columns                      
val checkedDf = sqlContext.createDataFrame(validateDF, newSchema)

// Filter out row having errors
val errorDf = checkedDf.filter($"ERROR_COLUMN".isNotNull && $"ERROR_VALUE".isNotNull && $"ERROR_DESCRIPTION".isNotNull)

// Filter our row having no errors
val errorFreeDf = checkedDf.filter($"ERROR_COLUMN".isNull && !$"ERROR_VALUE".isNull && !$"ERROR_DESCRIPTION".isNull)

我亲自使用过这种方法,它对我很有用。我希望它能给你指明正确的方向。

票数 8
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46136715

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档