首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spark sql -如何在spark sql中编写动态查询

spark sql -如何在spark sql中编写动态查询
EN

Stack Overflow用户
提问于 2018-10-09 02:34:22
回答 2查看 3.3K关注 0票数 0

我有一张蜂窝桌。我想在spark提交时创建动态spark SQL queries.at,我指定了规则名。基于规则名称的查询应该生成。在spark提交时,我必须指定规则名称。例如:

代码语言:javascript
复制
sparks-submit  <RuleName> IncorrectAge 

它应该触发我的scala对象代码:

代码语言:javascript
复制
select tablename, filter, condition from all_rules where rulename="IncorrectAge"

我的表:规则(输入表)

代码语言:javascript
复制
|---------------------------------------------------------------------------|
| rowkey|  rule_name|rule_run_status| tablename     |condition|filter |level|
|--------------------------------------------------------------------------|
| 1    |IncorrectAge| In_Progress  | VDP_Vendor_List| age>18 gender=Male|NA|
|---------------------------------------------------------------------------
|2    | Customer_age| In_Progress  | Customer_List | age<25 gender=Female|NA| 
|----------------------------------------------------------------------------

我获取规则名:

代码语言:javascript
复制
 select tablename, filter, condition from all_rules where rulename="IncorrectAge";

执行此查询后,我得到了如下结果:

代码语言:javascript
复制
   |----------------------------------------------|
   |tablename        | filter         | condition |
   |----------------------------------------------|
   |VDP_Vendor_List  | gender=Male     | age>18   |
   |----------------------------------------------|

现在,我想要动态地进行spark sql查询

代码语言:javascript
复制
select count(*) from VDP_Vendor_List  // first column --tablename     
       select count(*) from VDP_Vendor_List where gender=Male  --tablename and filter
        select * from EMP where gender=Male  AND  age >18       --tablename, filter, condition

我的代码-Spark 2.2版本代码:

代码语言:javascript
复制
         import org.apache.spark.sql.{ Row, SparkSession }
         import org.apache.log4j._

object allrules {
  def main(args: Array[String]) {      
    val spark = SparkSession.builder().master("local[*]")
      .appName("Spark Hive")
      .enableHiveSupport().getOrCreate();

    import spark.implicits._
    val sampleDF = spark.read.json("C:/software/sampletableCopy.json") // for testing purpose i converted  hive table to json data
 sampleDF.registerTempTable("sampletable")
 val allrulesDF = spark.sql("SELECT * FROM sampletable")

  allrulesDF.show()
  val TotalCount: Long = allrulesDF.count()
  println("==============>  Total count ======>" + allrulesDF.count())

  val df1 =  allrulesDF.select(allrulesDF.col("tablename"),allrulesDF.col("condition"),allrulesDF.col("filter"),allrulesDF.col("rule_name"))
 df1.show()
 val df2=   df1.where(df1.col("rule_name").equalTo("IncorrectAge")).show()           
    println(df2)



//                             var table_name = ""
//                             var condition =""
   //                              var filter = "";
  //              df1.foreach(row=>{    
  //                                   table_name = row.get(1).toString();
  //                                   condition = row.get(2).toString();
  //                                   filter = row.get(3).toString();                             
  //                              })

   }
 }
EN

回答 2

Stack Overflow用户

发布于 2018-10-09 04:55:05

您可以将参数从spark-submit传递给您的应用程序:

代码语言:javascript
复制
bin/spark-submit --class allrules something.jar tablename filter condition

然后,在你的main函数中,你将拥有你的params:

代码语言:javascript
复制
def main(args: Array[String]) : Unit = {

   // args(0), args(1) ... there are your params

}
票数 0
EN

Stack Overflow用户

发布于 2019-01-02 22:32:48

您可以将参数传递给driver类,如下所示:

代码语言:javascript
复制
    object DriverClass
    {
       val log = Logger.getLogger(getClass.getName)
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder().master("yarn").config("spark.sql.warehouse.dir", "path").enableHiveSupport().getOrCreate()
          if (args == null || args.isEmpty || args.length != 2) {
                log.error("Invalid number of arguments passed.")
                log.error("Arguments Usage: <Rule Name> <Rule Type>)
                log.error("Stopping the flow")
                System.exit(1)
            }
         import spark.implicits._
         val ruleName: String = String.valueOf(args(0).trim())
         val ruleType: String = String.valueOf(args(1).trim())
         val processSQL: String="Select tablename, filter, condition from all_rules where $ruleName=$ruleType"
         val metadataDF=spark.sql(processSQL)
         val (tblnm,fltr,cndtn) =metadataDF.rdd.map(f=>(f.get(0).toString(),f.get(1).toString(),f.get(2).toString())).collect()(0)
    val finalSql_1="select count(*) from $tblnm"  // first column    
    val finalSql_2="select count(*) from $tblnm" where $fltr"
    val finalSql_3="select * from EMP where $fltr  AND  $cndtn"
    spark.sql(finalSql_1).show()
    spark.sql(finalSql_2).show()
    spark.sql(finalSql_3).show()
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52708216

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档