我将从Server获取表并进行处理,在Spark应用程序中转换为JSON文件,然后将其存储到MongoDB中。我导出了一个JSON文件,在这个JSON文件中,ACCOUNTNO是唯一的,一个ACCOUNTNO可能有一个或多个VEHICLENUMBER,所以我为惟一的ACCOUNTNO创建了一个数组。我们已经将多个VEHICLENUMBER放在VEHICLE数组中的独立对象中。
我使用了以下查询:
val res00 = sparksessionobject.sqlContext.sql(SELECT ACCOUNTNO, collect_set(struct(VEHICLENUMBER, CUSTOMERID,ACCOUNTGROUPID,PREPAIDACCOUNTSTATUSID,PREPAIDACCOUNTSTATUSDATE,SOURCEOFENTRY,REVENUECATEGORYID,VEHICLECLASS,SERIALNO,HEXTAGID,TAGSTATUS,TAGSTARTEFFDATE,TAGENDEFFDATE,ISTAGBLACKLISTED,ISBLACKLISTHOLD,RCVERIFICATIONSTATUS,EMAILADDRESS,PHONENUMBER,CREATEDDATE,CREATEDUSER,UPDATEDDATE,UPDATEDUSER,ISFEEWAIVER,FEEWAIVERPASSTYPE,VEHICLEIMGVERIFICATIONSTATUS,TAGTID,ISREVENUERECHARGE)) as VEHICLE FROM (SELECT *, row_number() OVER (PARTITION BY VEHICLENUMBER ORDER BY TAGSTARTEFFDATE DESC) AS rn FROM tp_customer_account) tmp WHERE rn=1 GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO")
res00.coalesce(1).write.json("D:/res39")我的JSON文件看起来是这样的。
ACCOUNTNO : 10003018
VEHICLE : Array
0 : Object
ACCOUNTNO : 10003019
VEHICLE : Array
0 : Object
1 : Object
2 : Object
ACCOUNTNO : 10003020
VEHICLE : Array
0 : Object
VEHICLENUMBER : "MH01AX5658"
CUSTOMERID : 20000006
ACCOUNTGROUPID : 21
PREPAIDACCOUNTSTATUSID : 3079
PREPAIDACCOUNTSTATUSDATE : 2015-09-16 14:58:27.593
SOURCEOFENTRY : "RegularRetailer"
REVENUECATEGORYID : 75
VEHICLECLASS : "4"
SERIALNO : "137438955930"
HEXTAGID : "9189070480200000099A"
TAGSTATUS : "TAGINACTIVE"
TAGSTARTEFFDATE : 2013-06-16 12:27:34.997
TAGENDEFFDATE : 2015-09-16 15:21:39.960
ISTAGBLACKLISTED : true
ISBLACKLISTHOLD : false
EMAILADDRESS : "prabjyotsingh.kalsi@yahoo.com ..."
PHONENUMBER : "09909913811 "
CREATEDDATE : 2013-06-16 12:12:37.327
CREATEDUSER : "bhagwadapos"
UPDATEDDATE : 2015-09-16 15:06:39.960
UPDATEDUSER : "BLTagProcess"使用pretty()在MongoDB数据库中使用相同JSON文件的另一个视图。
{
"_id" : ObjectId("5b5ac17b0fef3110a8eb2319"),
"ACCOUNTNO" : NumberLong(10003014),
"VEHICLE" : [
{
"VEHICLENUMBER" : "MH43AJ411",
"CUSTOMERID" : NumberLong(20000001),
"ACCOUNTGROUPID" : 15,
"PREPAIDACCOUNTSTATUSID" : 3079,
"PREPAIDACCOUNTSTATUSDATE" : ISODate("2015-09-16T09:28:27.500Z"),
"SOURCEOFENTRY" : "RegularRetailer",
"REVENUECATEGORYID" : 75,
"VEHICLECLASS" : "4",
"SERIALNO" : "206158433290",
"HEXTAGID" : "91890704803000000C0A",
"TAGSTATUS" : "TAGINACTIVE",
"TAGSTARTEFFDATE" : ISODate("2014-08-08T08:54:12.227Z"),
"TAGENDEFFDATE" : ISODate("2015-09-16T09:51:42.437Z"),
"ISTAGBLACKLISTED" : true,
"ISBLACKLISTHOLD" : false,
"EMAILADDRESS" : "shankarn75@rediffmail.com ",
"PHONENUMBER" : "9004419178 ",
"CREATEDDATE" : ISODate("2013-06-07T07:26:16.650Z"),
"CREATEDUSER" : "bhagwadapos",
"UPDATEDDATE" : ISODate("2015-09-16T09:36:42.437Z"),
"UPDATEDUSER" : "BLTagProcess"
}
]
}
{
"_id" : ObjectId("5b5ac17b0fef3110a8eb231a"),
"ACCOUNTNO" : NumberLong(10003015),
"VEHICLE" : [
{
"VEHICLENUMBER" : "MH12GZ3392",
"CUSTOMERID" : NumberLong(20000002),
"ACCOUNTGROUPID" : 16,
"PREPAIDACCOUNTSTATUSID" : 2079,
"PREPAIDACCOUNTSTATUSDATE" : ISODate("2013-06-07T07:44:13.903Z"),
"SOURCEOFENTRY" : "RegularRetailer",
"REVENUECATEGORYID" : 75,
"VEHICLECLASS" : "4",
"SERIALNO" : "137438955875",
"HEXTAGID" : "91890704802000000963",
"TAGSTATUS" : "Assigned",
"TAGSTARTEFFDATE" : ISODate("2013-06-07T07:47:11.550Z"),
"TAGENDEFFDATE" : ISODate("2018-06-06T18:29:59.997Z"),
"ISTAGBLACKLISTED" : false,
"ISBLACKLISTHOLD" : false,
"EMAILADDRESS" : "hiteshmpatil@gmail.com ",
"PHONENUMBER" : "9823131243 ",
"CREATEDDATE" : ISODate("2013-06-07T07:45:29.337Z"),
"CREATEDUSER" : "bhagwadapos",
"UPDATEDDATE" : ISODate("2013-06-07T07:45:29.337Z"),
"UPDATEDUSER" : "bhagwadapos"
}
]
}因此,我在Server中还有3个表,比如电子邮件表、登录表和电话表。每个表都包含ACCOUNTNO。
所有3个表都包含ACCOUNTNO,因此我必须将三个表中的每个表中包含的相同的ACCOUNTNO行插入到单个JSON文件中
因此,我需要在JSON文件中的ACCOUNTNO之后和VEHICLENO之前再添加3个数组,应该是这样的,如下所示
ACCOUNTNO : 10003018
Email : Array
0 : Object
Login : Array
0 : Object
Phones : Array
0 : Object
VEHICLE : Array
0 : Object列和一行值的TP_CUSTOMER_LOGINS 表视图:
LOGINID ACCOUNTNO USERNAME PASSWORD LAST_LOGINDATE LAST_PWD_MODIFIEDDATE CURRENT_PWD_EXPIRYDATE PWD_ATTEMPTS_COUNT PINNUMBER ISLOCKED CREATEDDATE CREATEDUSER UPDATEDDATE UPDATEDUSER THEMES LANGUAGES STATUSID USERTYPEID ROLENAME SQ_ATTEMPTCOUNT SQ_LOCKOUTTIME
41118 10076338 user1212 passpasspasspass 27:23.2 20:29.0 20:12.8 0 0 20:29.0 deenkapoor 27:39.5 deenkapoor Maroon en-IN 2111 2 NULL NULL 列和一行值的TP_CUSTOMER_EMAILS 表视图:
CUSTMAILID ACCOUNTNO EMAILTYPE EMAILADDRESS ISACTIVE ISCOMMUNICATION CREATEDDATE CREATEDUSER UPDATEDDATE UPDATEDUSER
38404 10078435 PrimaryEmail something.ok@gmail.com 1 1 26:36.3 System 26:36.3 System 列和一行值的TP_CUSTOMER_PHONES 表视图:
CUSTPHONEID ACCOUNTNO PHONETYPE PHONENUMBER EXTENTION ISACTIVE ISCOMMUNICATION CREATEDDATE CREATEDUSER UPDATEDDATE UPDATEDUSER
91831 10078435 MobileNo 9999999999 1 1 26:36.3 System 26:36.3 System我必须加入SparkSQL查询。需要帮助。谢谢。
更新了!我尝试使用两个表连接,我已经编写了在Server中工作的SQL查询,但是它不能在SparkSQL中工作
代码:
package com.issuer.pack3.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object sqltojson {
def main(args:Array[String])
{
System.setProperty("hadoop.home.dir", "C:/winutil/")
val db = "ISSUER"
val table1 = "[HISTORY].[TP_CUSTOMER_PREPAIDACCOUNTS]"
val custinfo1 = "[PLAY].TP_CUSTOMER_ADDRESSES"
val conf = new SparkConf().setAppName("SQLtoJSON").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val jdbcSqlConnStr = s"jdbc:sqlserver://192.168.70.15;databaseName=$db;user=bhaskar;password=welcome123;"
val jdbcDbTable1 = table1
val jdbcDbTable2 = custinfo1
val jdbcDF1 = sqlContext.read.format("jdbc").options(Map("url" -> jdbcSqlConnStr,"dbtable" -> jdbcDbTable1)).load()
val jdbcDF2 = sqlContext.read.format("jdbc").options(Map("url" -> jdbcSqlConnStr,"dbtable" -> jdbcDbTable2)).load()
//jdbcDF.show(10)
//jdbcDF.printSchema
jdbcDF1.registerTempTable("customer_account")
jdbcDF2.registerTempTable("customer_address")
val query1 = "WITH RowNumberedAccounts AS( select O.CUSTADDRESSID ,O.ADDRESSTYPE ,O.ADDRESSLINE1 ,O.ADDRESSLINE2 ,O.ADDRESSLINE3 ,O.CITY,O.STATE ,O.COUNTRY ,O.ZIP1 ,O.ISACTIVE ,O.ISCOMMUNICATION ,O.CREATEDDATE ,O.CREATEDUSER ,O.UPDATEDDATE ,O.UPDATEDUSER ,O.REASONCODE ,O.ZIP2 ,C.ACCOUNTNO as C_ACCNO ,C.CUSTOMERID ,C.ACCOUNTGROUPID ,C.PREPAIDACCOUNTSTATUSID ,C.PREPAIDACCOUNTSTATUSDATE ,C.SOURCEOFENTRY ,C.REVENUECATEGORYID ,C.VEHICLENUMBER ,C.VEHICLECLASS ,C.SERIALNO ,C.HEXTAGID ,C.TAGSTATUS ,C.TAGSTARTEFFDATE ,C.TAGENDEFFDATE ,C.ISTAGBLACKLISTED ,C.ISBLACKLISTHOLD ,C.RCVERIFICATIONSTATUS ,C.EMAILADDRESS ,C.PHONENUMBER ,C.CREATEDDATE AS CCreatedDate ,C.CREATEDUSER AS CCreatedUser ,C.UPDATEDDATE AS CUpdatedDate ,C.UPDATEDUSER AS CUpdatedUser ,C.HISTID ,C.ACTION ,C.ISFEEWAIVER ,C.FEEWAIVERPASSTYPE ,C.VEHICLEIMGVERIFICATIONSTATUS ,C.TAGTID ,C.ISREVENUERECHARGE , ROW_NUMBER() OVER ( PARTITION BY C.VEHICLENUMBER ORDER BY C.TAGSTARTEFFDATE DESC) AS rn from customer_account c INNER join customer_address o on c.ACCOUNTNO = o.ACCOUNTNO)SELECT R.* FROM RowNumberedAccounts AS R WHERE rn = 1 order by C_ACCNO"
val res00 = sqlContext.sql(query1.toString)
res00.registerTempTable("joined_acc_add")
res00.show(10)
val query2 = "SELECT C_ACCNO AS ACCOUNTNO, collect_set(struct(VEHICLENUMBER, CUSTOMERID,ACCOUNTGROUPID,PREPAIDACCOUNTSTATUSID,PREPAIDACCOUNTSTATUSDATE,SOURCEOFENTRY,REVENUECATEGORYID,VEHICLECLASS,SERIALNO,HEXTAGID,TAGSTATUS,TAGSTARTEFFDATE,TAGENDEFFDATE,ISTAGBLACKLISTED,ISBLACKLISTHOLD,RCVERIFICATIONSTATUS,EMAILADDRESS,PHONENUMBER,CREATEDDATE,CREATEDUSER,UPDATEDDATE,UPDATEDUSER,ISFEEWAIVER,FEEWAIVERPASSTYPE,VEHICLEIMGVERIFICATIONSTATUS,TAGTID,ISREVENUERECHARGE)) as VEHICLE, collect_set(struct(CUSTADDRESSID ,ADDRESSTYPE ,ADDRESSLINE1 ,ADDRESSLINE2 ,ADDRESSLINE3 ,CITY,O.STATE ,COUNTRY ,ZIP1 ,ISACTIVE ,ISCOMMUNICATION ,CREATEDDATE ,CREATEDUSER ,UPDATEDDATE ,UPDATEDUSER ,REASONCODE ,ZIP2)) as ADDRESS FROM joined_acc_add GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO"
val res01 = sqlContext.sql(query2.toString)
res01.coalesce(1).write.json("D:/result01")
//###########################
// jdbcDF.withColumn("VEHICLE",struct("VEHICLENUMBER","CUSTOMERID")) // withColumn for Add or replace Columns, struct for Creates a new struct column.
// .select("VEHICLE","ACCOUNTNO")
// .groupBy("ACCOUNTNO")
// .agg(collect_set("VEHICLE").as("VEHICLE")). //collect_set(Column e) It's an Aggregate function: returns a set of objects with duplicate elements eliminated.
// orderBy("ACCOUNTNO").
// coalesce(1).write.json("D:/res10")
}
}错误StackTrace:
18/08/07 13:31:25 INFO DAGScheduler: Job 0 finished: show at sqltojson.scala:40, took 99.958582 s
18/08/07 13:31:25 INFO CodeGenerator: Code generated in 11.907295 ms
18/08/07 13:31:25 INFO SparkSqlParser: Parsing command: SELECT C_ACCNO AS ACCOUNTNO, collect_set(struct(VEHICLENUMBER, CUSTOMERID,ACCOUNTGROUPID,PREPAIDACCOUNTSTATUSID,PREPAIDACCOUNTSTATUSDATE,SOURCEOFENTRY,REVENUECATEGORYID,VEHICLECLASS,SERIALNO,HEXTAGID,TAGSTATUS,TAGSTARTEFFDATE,TAGENDEFFDATE,ISTAGBLACKLISTED,ISBLACKLISTHOLD,RCVERIFICATIONSTATUS,EMAILADDRESS,PHONENUMBER,CREATEDDATE,CREATEDUSER,UPDATEDDATE,UPDATEDUSER,ISFEEWAIVER,FEEWAIVERPASSTYPE,VEHICLEIMGVERIFICATIONSTATUS,TAGTID,ISREVENUERECHARGE)) as VEHICLE, collect_set(struct(CUSTADDRESSID ,ADDRESSTYPE ,ADDRESSLINE1 ,ADDRESSLINE2 ,ADDRESSLINE3 ,CITY,O.STATE ,COUNTRY ,ZIP1 ,ISACTIVE ,ISCOMMUNICATION ,CREATEDDATE ,CREATEDUSER ,UPDATEDDATE ,UPDATEDUSER ,REASONCODE ,ZIP2)) as ADDRESS FROM joined_acc_add GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO
+-------------+-----------+--------------------+--------------------+------------+-----------+-----+-------+------+--------+---------------+--------------------+-----------+--------------------+-----------+----------+----+--------+----------+--------------+----------------------+------------------------+---------------+-----------------+-------------+------------+------------+--------------------+---------------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+---------------+--------------------+------------+--------------------+------------+-------+------+-----------+-----------------+----------------------------+------+-----------------+---+
|CUSTADDRESSID|ADDRESSTYPE| ADDRESSLINE1| ADDRESSLINE2|ADDRESSLINE3| CITY|STATE|COUNTRY| ZIP1|ISACTIVE|ISCOMMUNICATION| CREATEDDATE|CREATEDUSER| UPDATEDDATE|UPDATEDUSER|REASONCODE|ZIP2| C_ACCNO|CUSTOMERID|ACCOUNTGROUPID|PREPAIDACCOUNTSTATUSID|PREPAIDACCOUNTSTATUSDATE| SOURCEOFENTRY|REVENUECATEGORYID|VEHICLENUMBER|VEHICLECLASS| SERIALNO| HEXTAGID| TAGSTATUS| TAGSTARTEFFDATE| TAGENDEFFDATE|ISTAGBLACKLISTED|ISBLACKLISTHOLD|RCVERIFICATIONSTATUS| EMAILADDRESS| PHONENUMBER| CCreatedDate|CCreatedUser| CUpdatedDate|CUpdatedUser| HISTID|ACTION|ISFEEWAIVER|FEEWAIVERPASSTYPE|VEHICLEIMGVERIFICATIONSTATUS|TAGTID|ISREVENUERECHARGE| rn|
+-------------+-----------+--------------------+--------------------+------------+-----------+-----+-------+------+--------+---------------+--------------------+-----------+--------------------+-----------+----------+----+--------+----------+--------------+----------------------+------------------------+---------------+-----------------+-------------+------------+------------+--------------------+---------------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+---------------+--------------------+------------+--------------------+------------+-------+------+-----------+-----------------+----------------------------+------+-----------------+---+
| 41| Mailing|B309 PROGRESSIVE ...| SECTOR-6| GHANSOLI|NAVI MUMBAI| MH| IND|400701| true| true|2013-06-07 12:55:...|bhagwadapos|2013-06-07 12:55:...|bhagwadapos| null|null|10003014| 20000001| 15| 3079| 2015-09-16 14:58:...|RegularRetailer| 75| MH43AJ411| 4|206158433290|91890704803000000C0A| TAGINACTIVE|2014-08-08 14:24:...|2015-09-16 15:21:...| true| false| null|shankarn75@rediff...|9004419178 |2013-06-07 12:56:...| bhagwadapos|2015-09-16 15:06:...|BLTagProcess| 16257|UPDATE| null| null| null| null| null| 1|
| 86| Mailing|FLAT NO 11 TANUSH...| SAI CHOWK| | PUNE| MH| IND|411027| true| true|2013-06-07 13:11:...|bhagwadapos|2013-06-07 13:11:...|bhagwadapos| null|null|10003015| 20000002| 16| 2079| 2013-06-07 13:14:...|RegularRetailer| 75| MH12GZ3392| 4|137438955875|91890704802000000963| Assigned|2013-06-07 13:17:...|2018-06-06 23:59:...| false| false| null|hiteshmpatil@gmai...|9823131243 |2013-06-07 13:15:...| bhagwadapos|2013-06-07 13:15:...| bhagwadapos| 3|INSERT| null| null| null| null| null| 1|
| 42| Mailing|at-susmma sadan ...|kotak vallay pa...| | valsad| GJ| IND|396001| true| true|2013-06-07 14:28:...|bhagwadapos|2013-06-07 14:28:...|bhagwadapos| null|null|10003016| 20000003| 17| 2131| 2014-11-24 02:30:...|RegularRetailer| 75| GJ15Z8173| 9|137438955877|91890704802000000965| TAGINACTIVE|2013-06-07 14:46:...|2014-11-24 02:52:...| true| false| null|bhagwada.irb@gmai...|8652836666 |2013-06-07 14:31:...| bhagwadapos|2014-11-24 02:37:...|BLTagProcess| 7747|UPDATE| null| null| null| null| null| 1|
| 43| Mailing|Flat No 1, Buildi...| Near Shivaji Pak| | Dadar| MH| IND|400002| true| true|2013-06-13 12:48:...| charotipos|2013-06-13 12:48:...| charotipos| null|null|10003018| 20000004| 19| 2131| 2014-11-24 02:30:...|RegularRetailer| 75| MH05AM902| 11|137438955473|918907048020000007D1| TAGINACTIVE|2013-06-13 13:15:...|2014-11-24 02:51:...| true| false| null|kelkar.suhas@gmai...|9821032045 |2013-06-13 12:50:...| charotipos|2014-11-24 02:36:...|BLTagProcess| 7700|UPDATE| null| null| null| null| null| 1|
| 87| Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...| | VALSAD| GJ| IND|396035| true| true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos| null|null|10003019| 20009020| 20| 2079| 2016-05-25 18:22:...| Internal| 75| GJ15CF7747| 4| 68719486473|91890704801000002609| ASSIGNED|2016-05-25 18:46:...| 2041-05-25 23:59:59| false| null| 2083|kaviwala@desaicon...|9879110770 |2016-05-25 18:22:...| 263858|2017-02-27 11:35:...| HUSSAIN|2064466|UPDATE| null| null| null| null| null| 1|
| 87| Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...| | VALSAD| GJ| IND|396035| true| true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos| null|null|10003019| 20000008| 20| 2129| 2016-02-16 02:40:...|RegularRetailer| 75| GJ15CB727| 4|137438955936|918907048020000009A0|RETURNEDDAMAGED|2013-06-17 12:36:...|2016-02-08 17:12:...| true| false| null|kaviwala@desaicon...|9879110770 |2013-06-17 12:26:...| bhagwadapos|2016-02-16 02:46:...|BatchProcess| 29254|UPDATE| null| null| null| null| null| 1|
| 87| Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...| | VALSAD| GJ| IND|396035| true| true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos| null|null|10003019| 20001223| 20| 2079| 2014-06-06 14:52:...| AgentPOS| 75| GJ15CA7837| 4|137438956220|91890704802000000ABC| ASSIGNED|2014-06-06 14:57:...| 2039-06-06 23:59:59| false| null| 2083|kaviwala@desaicon...|9879110770 |2014-06-06 15:00:...| bhagwadapos|2017-02-27 11:35:...| HUSSAIN|2064457|UPDATE| null| null| null| null| null| 1|
| 87| Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...| | VALSAD| GJ| IND|396035| true| true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos| null|null|10003019| 20029961| 20| 2079| 2016-07-28 16:27:...| Internal| 75| GJ15CD7387| 4| 68719511515|918907048010000087DB| ASSIGNED|2016-07-28 19:21:...| 2041-07-28 23:59:59| false| null| 2083|kaviwala@desaicon...|9879110770 |2016-07-28 16:27:...| 280603|2017-02-07 17:24:...| HUSSAIN|1607128|UPDATE| null| null| null| null| null| 1|
| 87| Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...| | VALSAD| GJ| IND|396035| true| true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos| null|null|10003019| 20001557| 20| 2079| 2014-10-01 18:22:...| Internal| 75| GJ15CB9601| 4| 68719479744|91890704801000000BC0| ASSIGNED|2016-05-05 16:45:...| 2041-05-05 23:59:59| false| null| 2083|kaviwala@desaicon...|9879110770 |2014-10-01 18:33:...| 263858|2017-02-27 11:35:...| HUSSAIN|2064460|UPDATE| null| null| null| null| null| 1|
| 87| Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...| | VALSAD| GJ| IND|396035| true| true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos| null|null|10003019| 20000933| 20| 2079| 2014-02-12 13:52:...|RegularRetailer| 75| MH02DG7774| 4|137438956174|91890704802000000A8E| Assigned|2014-02-12 13:49:...|2019-02-11 23:59:...| false| false| null|kaviwala@desaicon...|9879110770 |2014-02-12 13:43:...| bhagwadapos|2017-02-27 11:35:...| HUSSAIN|2064453|UPDATE| null| null| null| null| null| 1|
+-------------+-----------+--------------------+--------------------+------------+-----------+-----+-------+------+--------+---------------+--------------------+-----------+--------------------+-----------+----------+----+--------+----------+--------------+----------------------+------------------------+---------------+-----------------+-------------+------------+------------+--------------------+---------------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+---------------+--------------------+------------+--------------------+------------+-------+------+-----------+-----------------+----------------------------+------+-----------------+---+
only showing top 10 rows
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`ACCOUNTNO`' given input columns: [REASONCODE, EMAILADDRESS, ADDRESSTYPE, VEHICLEIMGVERIFICATIONSTATUS, CUpdatedUser, ISBLACKLISTHOLD, TAGENDEFFDATE, ZIP2, VEHICLENUMBER, ISFEEWAIVER, ZIP1, FEEWAIVERPASSTYPE, UPDATEDUSER, PREPAIDACCOUNTSTATUSID, C_ACCNO, PHONENUMBER, ISACTIVE, SERIALNO, ACTION, CREATEDUSER, rn, ADDRESSLINE2, HISTID, PREPAIDACCOUNTSTATUSDATE, TAGTID, CCreatedDate, ADDRESSLINE3, CUpdatedDate, ISCOMMUNICATION, ADDRESSLINE1, ACCOUNTGROUPID, CITY, COUNTRY, CUSTADDRESSID, CREATEDDATE, CUSTOMERID, VEHICLECLASS, TAGSTARTEFFDATE, REVENUECATEGORYID, CCreatedUser, ISTAGBLACKLISTED, RCVERIFICATIONSTATUS, STATE, HEXTAGID, ISREVENUERECHARGE, UPDATEDDATE, SOURCEOFENTRY, TAGSTATUS]; line 1 pos 730;
'Sort ['ACCOUNTNO ASC NULLS FIRST], true
+- 'Aggregate ['ACCOUNTNO], [C_ACCNO#100L AS ACCOUNTNO#207L, collect_set(named_struct(VEHICLENUMBER, VEHICLENUMBER#7, CUSTOMERID, CUSTOMERID#0L, ACCOUNTGROUPID, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY, SOURCEOFENTRY#5, REVENUECATEGORYID, REVENUECATEGORYID#6, VEHICLECLASS, VEHICLECLASS#8, SERIALNO, SERIALNO#9, HEXTAGID, HEXTAGID#10, TAGSTATUS, TAGSTATUS#11, TAGSTARTEFFDATE, TAGSTARTEFFDATE#12, ... 30 more fields), 0, 0) AS VEHICLE#208, 'collect_set(named_struct(CUSTADDRESSID, CUSTADDRESSID#61L, ADDRESSTYPE, ADDRESSTYPE#63, ADDRESSLINE1, ADDRESSLINE1#64, ADDRESSLINE2, ADDRESSLINE2#65, ADDRESSLINE3, ADDRESSLINE3#66, CITY, CITY#67, NamePlaceholder, 'O.STATE, COUNTRY, COUNTRY#69, ZIP1, ZIP1#70, ISACTIVE, ISACTIVE#71, ISCOMMUNICATION, ISCOMMUNICATION#72, CREATEDDATE, CREATEDDATE#73, ... 10 more fields)) AS ADDRESS#209]
+- SubqueryAlias joined_acc_add
+- Sort [C_ACCNO#100L ASC NULLS FIRST], true
+- Project [CUSTADDRESSID#61L, ADDRESSTYPE#63, ADDRESSLINE1#64, ADDRESSLINE2#65, ADDRESSLINE3#66, CITY#67, STATE#68, COUNTRY#69, ZIP1#70, ISACTIVE#71, ISCOMMUNICATION#72, CREATEDDATE#73, CREATEDUSER#74, UPDATEDDATE#75, UPDATEDUSER#76, REASONCODE#77, ZIP2#78, C_ACCNO#100L, CUSTOMERID#0L, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY#5, REVENUECATEGORYID#6, ... 24 more fields]
+- Filter (rn#105 = 1)
+- SubqueryAlias R
+- SubqueryAlias RowNumberedAccounts
+- Project [CUSTADDRESSID#61L, ADDRESSTYPE#63, ADDRESSLINE1#64, ADDRESSLINE2#65, ADDRESSLINE3#66, CITY#67, STATE#68, COUNTRY#69, ZIP1#70, ISACTIVE#71, ISCOMMUNICATION#72, CREATEDDATE#73, CREATEDUSER#74, UPDATEDDATE#75, UPDATEDUSER#76, REASONCODE#77, ZIP2#78, C_ACCNO#100L, CUSTOMERID#0L, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY#5, REVENUECATEGORYID#6, ... 24 more fields]
+- Project [CUSTADDRESSID#61L, ADDRESSTYPE#63, ADDRESSLINE1#64, ADDRESSLINE2#65, ADDRESSLINE3#66, CITY#67, STATE#68, COUNTRY#69, ZIP1#70, ISACTIVE#71, ISCOMMUNICATION#72, CREATEDDATE#73, CREATEDUSER#74, UPDATEDDATE#75, UPDATEDUSER#76, REASONCODE#77, ZIP2#78, C_ACCNO#100L, CUSTOMERID#0L, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY#5, REVENUECATEGORYID#6, ... 25 more fields]
+- Window [row_number() windowspecdefinition(VEHICLENUMBER#7, TAGSTARTEFFDATE#12 DESC NULLS LAST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#105], [VEHICLENUMBER#7], [TAGSTARTEFFDATE#12 DESC NULLS LAST]
+- Project [CUSTADDRESSID#61L, ADDRESSTYPE#63, ADDRESSLINE1#64, ADDRESSLINE2#65, ADDRESSLINE3#66, CITY#67, STATE#68, COUNTRY#69, ZIP1#70, ISACTIVE#71, ISCOMMUNICATION#72, CREATEDDATE#73, CREATEDUSER#74, UPDATEDDATE#75, UPDATEDUSER#76, REASONCODE#77, ZIP2#78, ACCOUNTNO#1L AS C_ACCNO#100L, CUSTOMERID#0L, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY#5, REVENUECATEGORYID#6, ... 23 more fields]
+- Join Inner, (ACCOUNTNO#1L = ACCOUNTNO#62L)
:- SubqueryAlias c
: +- SubqueryAlias customer_account
: +- Relation[CUSTOMERID#0L,ACCOUNTNO#1L,ACCOUNTGROUPID#2,PREPAIDACCOUNTSTATUSID#3,PREPAIDACCOUNTSTATUSDATE#4,SOURCEOFENTRY#5,REVENUECATEGORYID#6,VEHICLENUMBER#7,VEHICLECLASS#8,SERIALNO#9,HEXTAGID#10,TAGSTATUS#11,TAGSTARTEFFDATE#12,TAGENDEFFDATE#13,ISTAGBLACKLISTED#14,ISBLACKLISTHOLD#15,RCVERIFICATIONSTATUS#16,EMAILADDRESS#17,PHONENUMBER#18,CREATEDDATE#19,CREATEDUSER#20,UPDATEDDATE#21,UPDATEDUSER#22,HISTID#23L,... 6 more fields] JDBCRelation([HISTORY].[TP_CUSTOMER_PREPAIDACCOUNTS]) [numPartitions=1]
+- SubqueryAlias o
+- SubqueryAlias customer_address
+- Relation[CUSTADDRESSID#61L,ACCOUNTNO#62L,ADDRESSTYPE#63,ADDRESSLINE1#64,ADDRESSLINE2#65,ADDRESSLINE3#66,CITY#67,STATE#68,COUNTRY#69,ZIP1#70,ISACTIVE#71,ISCOMMUNICATION#72,CREATEDDATE#73,CREATEDUSER#74,UPDATEDDATE#75,UPDATEDUSER#76,REASONCODE#77,ZIP2#78] JDBCRelation([PLAY].TP_CUSTOMER_ADDRESSES) [numPartitions=1]
18/08/07 13:31:26 INFO ShutdownHookManager: Shutdown hook called如果我成功地获得了结果,那么我将能够编写SparkSQL查询,在Spark应用程序中的下一个查询中设计JSON,就像@RameshMaharjan所说的那样。
发布于 2018-08-08 21:16:05
我这样做了,并归功于@Ramesh Maharajan,因为他在评论中建议了我的方法,我已经实现了,我在下面展示了一些代码片段描述,这些代码片段说明了我是如何成功实现的。
val db = "databasename"
val table1 = "[HISTORY].[TP_CUSTOMER_PREPAIDACCOUNTS]"
val custinfo1 = "[confidential].TP_CUSTOMER_ADDRESSES"
val custinfo2 = "[confidential].TP_CUSTOMER_BUSINESS"
val custinfo3 = "[confidential].TP_CUSTOMER_EMAILS"
val custinfo4 = "[confidential].TP_CUSTOMER_LOGINS"
val custinfo5 = "[confidential].TP_CUSTOMER_PHONES"我已经注册了主表和每个客户信息表,并把它们连接在一起,所以我使用了左表。
因此,我编写的第一个联接查询:
val query1 = "WITH RowNumberedAccounts AS(select O.CUSTADDRESSID ,O.ADDRESSTYPE ,O.ADDRESSLINE1 ,O.ADDRESSLINE2 ,O.ADDRESSLINE3 ,O.CITY,O.STATE ,O.COUNTRY ,O.ZIP1 ,O.ISACTIVE ,O.ISCOMMUNICATION ,O.CREATEDDATE ,O.CREATEDUSER ,O.UPDATEDDATE ,O.UPDATEDUSER ,O.REASONCODE ,O.ZIP2 ,C.ACCOUNTNO as C_ACCNO ,C.CUSTOMERID ,C.ACCOUNTGROUPID ,C.PREPAIDACCOUNTSTATUSID ,C.PREPAIDACCOUNTSTATUSDATE ,C.SOURCEOFENTRY ,C.REVENUECATEGORYID ,C.VEHICLENUMBER ,C.VEHICLECLASS ,C.SERIALNO ,C.HEXTAGID ,C.TAGSTATUS ,C.TAGSTARTEFFDATE ,C.TAGENDEFFDATE ,C.ISTAGBLACKLISTED ,C.ISBLACKLISTHOLD ,C.RCVERIFICATIONSTATUS ,C.EMAILADDRESS ,C.PHONENUMBER ,C.CREATEDDATE AS CCreatedDate ,C.CREATEDUSER AS CCreatedUser ,C.UPDATEDDATE AS CUpdatedDate ,C.UPDATEDUSER AS CUpdatedUser ,C.HISTID ,C.ACTION ,C.ISFEEWAIVER ,C.FEEWAIVERPASSTYPE ,C.VEHICLEIMGVERIFICATIONSTATUS ,C.TAGTID ,C.ISREVENUERECHARGE , ROW_NUMBER() OVER ( PARTITION BY C.VEHICLENUMBER ORDER BY C.TAGSTARTEFFDATE DESC) AS rn from customer_account c LEFT join customer_address o on c.ACCOUNTNO = o.ACCOUNTNO)SELECT R.* FROM RowNumberedAccounts AS R WHERE rn = 1 order by C_ACCNO"在此之后,我已经加入了另一个查询,的最终查询 of SparkSQL是:
val queryF = "SELECT C_ACCNO AS ACCOUNTNO, collect_set(struct(VEHICLENUMBER, CUSTOMERID,ACCOUNTGROUPID,PREPAIDACCOUNTSTATUSID,PREPAIDACCOUNTSTATUSDATE,SOURCEOFENTRY,REVENUECATEGORYID,VEHICLECLASS,SERIALNO,HEXTAGID,TAGSTATUS,TAGSTARTEFFDATE,TAGENDEFFDATE,ISTAGBLACKLISTED,ISBLACKLISTHOLD,RCVERIFICATIONSTATUS,EMAILADDRESS,PHONENUMBER,ISFEEWAIVER,FEEWAIVERPASSTYPE,VEHICLEIMGVERIFICATIONSTATUS,TAGTID,ISREVENUERECHARGE,CCreatedDate,CCreatedUser,CUpdatedDate,CUpdatedUser)) as VEHICLE, collect_set(struct(CUSTADDRESSID ,ADDRESSTYPE ,ADDRESSLINE1 ,ADDRESSLINE2 ,ADDRESSLINE3 ,CITY,STATE ,COUNTRY ,ZIP1 ,ISACTIVE ,ISCOMMUNICATION ,CREATEDDATE ,CREATEDUSER ,UPDATEDDATE ,UPDATEDUSER ,REASONCODE ,ZIP2)) as ADDRESS, collect_set(struct(ORGANISATIONNAME,DATEOFINCORPORATION,PANCARDNUMBER,ORGANIZATIONTYPEID)) as BUSINESS, collect_set(struct(CUSTMAILID,EMAILTYPE,EMAIL,ISACTIVE_EMAIL,ISCOMMUNICATION_EMAIL)) as EMAIL, collect_set(struct(LOGINID, USERNAME, PASSWORD, LAST_LOGINDATE, LAST_PWD_MODIFIEDDATE, CURRENT_PWD_EXPIRYDATE, PWD_ATTEMPTS_COUNT, PINNUMBER, ISLOCKED,THEMES,LANGUAGES, STATUSID, USERTYPEID, ROLENAME, SQ_ATTEMPTCOUNT, SQ_LOCKOUTTIME)) as LOGIN, collect_set(struct(CUSTPHONEID, PHONETYPE, PHONENUMBER_PHONES, EXTENTION, ISACTIVE_PHONES, ISCOMMUNICATION_PHONES)) as PHONES FROM joined_acc_phones GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO"
val res0F = sqlContext.sql(queryF.toString)
res0F.show(10)
res0F.coalesce(1).write.json("D:/result01")https://stackoverflow.com/questions/51653050
复制相似问题