我有一张蜂窝桌。我想在spark提交时创建动态spark SQL queries.at,我指定了规则名。基于规则名称的查询应该生成。在spark提交时,我必须指定规则名称。例如:
sparks-submit <RuleName> IncorrectAge 它应该触发我的scala对象代码:
select tablename, filter, condition from all_rules where rulename="IncorrectAge"我的表:规则(输入表)
|---------------------------------------------------------------------------|
| rowkey| rule_name|rule_run_status| tablename |condition|filter |level|
|--------------------------------------------------------------------------|
| 1 |IncorrectAge| In_Progress | VDP_Vendor_List| age>18 gender=Male|NA|
|---------------------------------------------------------------------------
|2 | Customer_age| In_Progress | Customer_List | age<25 gender=Female|NA|
|----------------------------------------------------------------------------我获取规则名:
select tablename, filter, condition from all_rules where rulename="IncorrectAge";执行此查询后,我得到了如下结果:
|----------------------------------------------|
|tablename | filter | condition |
|----------------------------------------------|
|VDP_Vendor_List | gender=Male | age>18 |
|----------------------------------------------|现在,我想要动态地进行spark sql查询
select count(*) from VDP_Vendor_List // first column --tablename
select count(*) from VDP_Vendor_List where gender=Male --tablename and filter
select * from EMP where gender=Male AND age >18 --tablename, filter, condition我的代码-Spark 2.2版本代码:
import org.apache.spark.sql.{ Row, SparkSession }
import org.apache.log4j._
object allrules {
def main(args: Array[String]) {
val spark = SparkSession.builder().master("local[*]")
.appName("Spark Hive")
.enableHiveSupport().getOrCreate();
import spark.implicits._
val sampleDF = spark.read.json("C:/software/sampletableCopy.json") // for testing purpose i converted hive table to json data
sampleDF.registerTempTable("sampletable")
val allrulesDF = spark.sql("SELECT * FROM sampletable")
allrulesDF.show()
val TotalCount: Long = allrulesDF.count()
println("==============> Total count ======>" + allrulesDF.count())
val df1 = allrulesDF.select(allrulesDF.col("tablename"),allrulesDF.col("condition"),allrulesDF.col("filter"),allrulesDF.col("rule_name"))
df1.show()
val df2= df1.where(df1.col("rule_name").equalTo("IncorrectAge")).show()
println(df2)
// var table_name = ""
// var condition =""
// var filter = "";
// df1.foreach(row=>{
// table_name = row.get(1).toString();
// condition = row.get(2).toString();
// filter = row.get(3).toString();
// })
}
}发布于 2018-10-09 04:55:05
您可以将参数从spark-submit传递给您的应用程序:
bin/spark-submit --class allrules something.jar tablename filter condition然后,在你的main函数中,你将拥有你的params:
def main(args: Array[String]) : Unit = {
// args(0), args(1) ... there are your params
}发布于 2019-01-02 22:32:48
您可以将参数传递给driver类,如下所示:
object DriverClass
{
val log = Logger.getLogger(getClass.getName)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("yarn").config("spark.sql.warehouse.dir", "path").enableHiveSupport().getOrCreate()
if (args == null || args.isEmpty || args.length != 2) {
log.error("Invalid number of arguments passed.")
log.error("Arguments Usage: <Rule Name> <Rule Type>)
log.error("Stopping the flow")
System.exit(1)
}
import spark.implicits._
val ruleName: String = String.valueOf(args(0).trim())
val ruleType: String = String.valueOf(args(1).trim())
val processSQL: String="Select tablename, filter, condition from all_rules where $ruleName=$ruleType"
val metadataDF=spark.sql(processSQL)
val (tblnm,fltr,cndtn) =metadataDF.rdd.map(f=>(f.get(0).toString(),f.get(1).toString(),f.get(2).toString())).collect()(0)
val finalSql_1="select count(*) from $tblnm" // first column
val finalSql_2="select count(*) from $tblnm" where $fltr"
val finalSql_3="select * from EMP where $fltr AND $cndtn"
spark.sql(finalSql_1).show()
spark.sql(finalSql_2).show()
spark.sql(finalSql_3).show()
}
}https://stackoverflow.com/questions/52708216
复制相似问题