首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何减少AWS Deequ上的代码重复

如何减少AWS Deequ上的代码重复
EN

Stack Overflow用户
提问于 2021-02-24 13:53:56
回答 1查看 132关注 0票数 0

我有大约5个数据集(它们将在未来增长,因此泛化很重要),它们使用相同的标题调用相同的代码库,但我不确定如何确保

加载数据集

调用代码并写入不同的文件夹。如果你能帮上忙,那就太棒了,因为我是Scala的新手。这些是AWS Glue上的作业。唯一变化的是输入文件和结果的位置。

下面是三个示例--我想减少代码的重复:

代码语言:javascript
复制
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession




object Deequ {






def main(args: Array[String]) {
val conf = new SparkConf().setAppName("dq")
val spark = SparkSession.builder().appName("dq").getOrCreate()

 val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire- 
 fin-stg-data-dev-raw-gib/templates 
 /Contract_Portfolio_Assignment/Contract_Portfolio_Assignement_Compass/contract-portfolio- 
 assignment-compass - Sheet1.csv") 


 val verificationResult: VerificationResult = { VerificationSuite()
 // data to run the verification on
 .onData(dataset)
 // define a data quality check
  .addCheck(
  Check(CheckLevel.Error, "Template Validations") 



  .hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
  .hasMaxLength("* Contract Category", _==1)
  .isComplete("* Contract Category")


  .hasDataType("* Contract ID",ConstrainableDataTypes.String )
   .hasMaxLength("* Contract ID", _ <= 40)
  .isComplete("* Contract ID")
  
  
  .hasDataType("* Key Date",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Key Date", _ <= 8)
  .isComplete("* Key Date")
  
  
  .hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Portfolio Category", _ <= 4)
  .isComplete("* Portfolio Category")

  
  .hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
  .hasMaxLength("* Tranche Start Date", _ <= 8)
  .isComplete("* Tranche Start Date")
 // .isContainedIn("Portfolio Category", Array("2100"))

  
  
  .hasDataType("Portfolio",ConstrainableDataTypes.String)
  .hasMaxLength("Portfolio", _ <= 40)
  .isComplete("Portfolio")
  
  
  
  
  .hasDataType("Source System",ConstrainableDataTypes.String )
  .hasMaxLength("Source System", _ <= 10)
  .isComplete("Source System")
  .isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))

  
  .hasDataType("Delivery Package",ConstrainableDataTypes.String)
  .hasMaxLength("Delivery Package", _ <= 20)
  .isComplete("Delivery Package")
 // .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
 

 )
  


  // compute metrics and verify check conditions
  .run()
    }
    //val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
    val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
    resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw- 
    gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio- 
    Assignment-Compass/")
     }}

下面是第二个代码库:

代码语言:javascript
复制
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession




object Deequ {






 def main(args: Array[String]) {
  val conf = new SparkConf().setAppName("dq")
  val spark = SparkSession.builder().appName("dq").getOrCreate()

  val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct-ire- 
  fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Contract-Portfolio- 
  Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv") 


  val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
 .onData(dataset)
 // define a data quality check
 .addCheck(
   Check(CheckLevel.Error, "Template Validations") 



  .hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
  .hasMaxLength("* Contract Category", _==1)
  .isComplete("* Contract Category")


  .hasDataType("* Contract ID",ConstrainableDataTypes.String )
   .hasMaxLength("* Contract ID", _ <= 40)
  .isComplete("* Contract ID")
  
  
  .hasDataType("* Key Date",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Key Date", _ <= 8)
  .isComplete("* Key Date")
  
  
  .hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Portfolio Category", _ <= 4)
  .isComplete("* Portfolio Category")

  
  .hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
  .hasMaxLength("* Tranche Start Date", _ <= 8)
  .isComplete("* Tranche Start Date")
 // .isContainedIn("Portfolio Category", Array("2100"))

  
  
  .hasDataType("Portfolio",ConstrainableDataTypes.String)
  .hasMaxLength("Portfolio", _ <= 40)
  .isComplete("Portfolio")
  
  
  
  
  .hasDataType("Source System",ConstrainableDataTypes.String )
  .hasMaxLength("Source System", _ <= 10)
  .isComplete("Source System")
  .isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))

  
  .hasDataType("Delivery Package",ConstrainableDataTypes.String)
  .hasMaxLength("Delivery Package", _ <= 20)
  .isComplete("Delivery Package")
 // .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
 

  )
  


   // compute metrics and verify check conditions
   .run()
     }
  //val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
  val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
   resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw- 
    gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio- 
     Assignement-GIP-Validations/")
      }}

下面是第三个:

代码语言:javascript
复制
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.{ConstrainableDataTypes}
import org.apache.spark.sql.functions.{length, max}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession




 object Deequ {






    def main(args: Array[String]) {
      val conf = new SparkConf().setAppName("dq")
      val spark = SparkSession.builder().appName("dq").getOrCreate()

      val dataset = spark.read.option("header",true).option("delimiter",",").csv("s3://ct- 
      ire-fin-stg-data-dev-raw-gib/templates /Contract_Portfolio_Assignment/Portfolio- 
      Assignment-Mobilife/Mobilife-Portforlio-Assessment - Sheet1.csv") 


      val verificationResult: VerificationResult = { VerificationSuite()
       // data to run the verification on
    .onData(dataset)
    // define a data quality check
    .addCheck(
       Check(CheckLevel.Error, "Template Validations") 



  .hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
  .hasMaxLength("* Contract Category", _==1)
  .isComplete("* Contract Category")


  .hasDataType("* Contract ID",ConstrainableDataTypes.String )
   .hasMaxLength("* Contract ID", _ <= 40)
  .isComplete("* Contract ID")
  
  
  .hasDataType("* Key Date",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Key Date", _ <= 8)
  .isComplete("* Key Date")
  
  
  .hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Portfolio Category", _ <= 4)
  .isComplete("* Portfolio Category")

  
  .hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
  .hasMaxLength("* Tranche Start Date", _ <= 8)
  .isComplete("* Tranche Start Date")
 // .isContainedIn("Portfolio Category", Array("2100"))

  
  
  .hasDataType("Portfolio",ConstrainableDataTypes.String)
  .hasMaxLength("Portfolio", _ <= 40)
  .isComplete("Portfolio")
  
  
  
  
  .hasDataType("Source System",ConstrainableDataTypes.String )
  .hasMaxLength("Source System", _ <= 10)
  .isComplete("Source System")
  .isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))

  
  .hasDataType("Delivery Package",ConstrainableDataTypes.String)
  .hasMaxLength("Delivery Package", _ <= 20)
  .isComplete("Delivery Package")
   // .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
 

    )
  


     // compute metrics and verify check conditions
    .run()
      }
   //val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
    val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
    resultDataFrame.write.mode("overwrite").parquet("s3://ct-ire-fin-stg-data-dev-raw- 
    gib/template_validations/Contract-Portfolio-Assignment-Validations/contract-portfolio- 
     assessment_Mobilife-Validations/")
      }}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-02-24 14:32:06

根据我对你的问题的理解,你可以创建执行通用逻辑的函数,并且可以从不同的地方调用相同的函数。根据不同工作流的不同值,您可以为函数提供多个参数。

代码语言:javascript
复制
//Common Function just for your reference but you can modify it as you want.
object CommonHelper {
  def ProcessDataSet(spark : SparkSession,sourcePath : String , targetPath : String) : Unit = {
    
    val dataset = spark.read.option("header",true).option("delimiter",",").csv(sourcePath) 
  val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
 .onData(dataset)
 // define a data quality check
 .addCheck(
   Check(CheckLevel.Error, "Template Validations") 

  .hasDataType("* Contract Category", ConstrainableDataTypes.Integral)
  .hasMaxLength("* Contract Category", _==1)
  .isComplete("* Contract Category")

  .hasDataType("* Contract ID",ConstrainableDataTypes.String )
   .hasMaxLength("* Contract ID", _ <= 40)
  .isComplete("* Contract ID")
  
  .hasDataType("* Key Date",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Key Date", _ <= 8)
  .isComplete("* Key Date")
  
  .hasDataType("* Portfolio Category",ConstrainableDataTypes.Integral )
  .hasMaxLength("* Portfolio Category", _ <= 4)
  .isComplete("* Portfolio Category")

  .hasDataType("* Tranche Start Date",ConstrainableDataTypes.Integral)
  .hasMaxLength("* Tranche Start Date", _ <= 8)
  .isComplete("* Tranche Start Date")
 // .isContainedIn("Portfolio Category", Array("2100"))

  .hasDataType("Portfolio",ConstrainableDataTypes.String)
  .hasMaxLength("Portfolio", _ <= 40)
  .isComplete("Portfolio")
  
  .hasDataType("Source System",ConstrainableDataTypes.String )
  .hasMaxLength("Source System", _ <= 10)
  .isComplete("Source System")
  .isContainedIn("Source System", Array("LFST", "CLPB","CLCB","CLHR","CCLU"))

  .hasDataType("Delivery Package",ConstrainableDataTypes.String)
  .hasMaxLength("Delivery Package", _ <= 20)
  .isComplete("Delivery Package")
 // .isContainedIn("Legal Entity", Array("LP01", "LLAL"))
  )
   // compute metrics and verify check conditions
   .run()
     }
  //val metrics1 = successMetricsAsDataFrame(spark, analysisResult1)
  val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
   resultDataFrame.write.mode("overwrite").parquet(targetPath)
  }
}

现在,您可以从启动对象中的main函数调用它,如下所示。我已经为一个数据集展示了它,您可以将其重用于其他数据集。

代码语言:javascript
复制
object Deequ {
 def main(args: Array[String]) {
  val conf = new SparkConf().setAppName("dq")
  val spark = SparkSession.builder().appName("dq").getOrCreate()
val sourcePath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/templates/Contract_Portfolio_Assignment/Contract-Portfolio-Assignment-GIP/Portfolio-Assignment-GIP - Sheet1.csv"
   val targetPath1 = "s3://ct-ire-fin-stg-data-dev-raw-gib/template_validations/Contract-Portfolio-Assignment-Validations/Contract-Portfolio-Assignement-GIP-Validations/"
  CommonHelper.ProcessDataSet(spark,sourcePath1,targetPath1) //you can call this function from multiple places based on how you want to use that.
      }}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66345406

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档