我有大约5个数据集(它们将在未来增长,因此泛化很重要),它们使用相同的标题调用相同的代码库,但我不确定如何确保
加载数据集
调用代码并写入不同的文件夹。如果你能帮上忙,那就太棒了,因为我是Scala的新手。这些是AWS Glue上的作业。唯一变化的是输入文件和结果的位置。
下面是三个示例--我想减少代码的重复:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire-
fin-stg-data-dev-raw-gib/templates
/Contract_Portfolio_Assignment/Contract_Portfolio_Assignement_Compass/contract-portfolio-
assignment-compass - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-
Assignment-Compass/")
}}下面是第二个代码库:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire-
fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Contract-Portfolio-
Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-
Assignement-GIP-Validations/")
}}下面是第三个:
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-
ire-fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Portfolio-
Assignment-Mobilife/Mobilife-Portforlio-Assessment - Sheet1.csv")
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw-
gib/template_validations/Contract-Portfolio-Assignment-Validations/contract-portfolio-
assessment_Mobilife-Validations/")
}}发布于 2021-02-24 14:32:06
根据我对你的问题的理解,你可以创建执行通用逻辑的函数,并且可以从不同的地方调用相同的函数。根据不同工作流的不同值,您可以为函数提供多个参数。
//Common Function just for your reference but you can modify it as you want.
object CommonHelper {
def ProcessDataSet(spark : SparkSession,sourcePath : String , targetPath : String) : Unit = {
val dataset = spark.read.option("header",true).option("delimiter",",").csv(sourcePath)
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Template Validations")
.hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
.hasMaxLength("* Contract Category", _==1)
.isComplete("* Contract Category")
.hasDataType("* Contract ID",ConstrainableDataTypes.String )
.hasMaxLength("* Contract ID", _ <= 40)
.isComplete("* Contract ID")
.hasDataType("* Key Date",ConstrainableDataTypes.Integral )
.hasMaxLength("* Key Date", _ <= 8)
.isComplete("* Key Date")
.hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
.hasMaxLength("* Portfolio Category", _ <= 4)
.isComplete("* Portfolio Category")
.hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
.hasMaxLength("* Tranche Start Date", _ <= 8)
.isComplete("* Tranche Start Date")
// .isContainedIn("Portfolio Category", Array("2100"))
.hasDataType("Portfolio",ConstrainableDataTypes.String)
.hasMaxLength("Portfolio", _ <= 40)
.isComplete("Portfolio")
.hasDataType("Source System",ConstrainableDataTypes.String )
.hasMaxLength("Source System", _ <= 10)
.isComplete("Source System")
.isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))
.hasDataType("Delivery Package",ConstrainableDataTypes.String)
.hasMaxLength("Delivery Package", _ <= 20)
.isComplete("Delivery Package")
// .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
)
// compute metrics and verify check conditions
.run()
}
//val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.write.mode("overwrite").parquet(targetPath)
}
}现在,您可以从启动对象中的main函数调用它,如下所示。我已经为一个数据集展示了它,您可以将其重用于其他数据集。
object Deequ {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()
val sourcePath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/templates/Contract_Portfolio_Assignment/Contract-Portfolio-Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv"
val targetPath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-Assignement-GIP-Validations/"
CommonHelper.ProcessDataSet(spark,sourcePath1,targetPath1) //you can call this function from multiple places based on how you want to use that.
}}https://stackoverflow.com/questions/66345406
复制相似问题