首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >加入SparkSQL设计一个JSON

加入SparkSQL设计一个JSON
EN

Stack Overflow用户
提问于 2018-08-02 11:56:48
回答 1查看 382关注 0票数 2

我将从Server获取表并进行处理,在Spark应用程序中转换为JSON文件,然后将其存储到MongoDB中。我导出了一个JSON文件,在这个JSON文件中,ACCOUNTNO是唯一的,一个ACCOUNTNO可能有一个或多个VEHICLENUMBER,所以我为惟一的ACCOUNTNO创建了一个数组。我们已经将多个VEHICLENUMBER放在VEHICLE数组中的独立对象中。

我使用了以下查询:

代码语言:javascript
复制
val res00 = sparksessionobject.sqlContext.sql(SELECT ACCOUNTNO, collect_set(struct(VEHICLENUMBER, CUSTOMERID,ACCOUNTGROUPID,PREPAIDACCOUNTSTATUSID,PREPAIDACCOUNTSTATUSDATE,SOURCEOFENTRY,REVENUECATEGORYID,VEHICLECLASS,SERIALNO,HEXTAGID,TAGSTATUS,TAGSTARTEFFDATE,TAGENDEFFDATE,ISTAGBLACKLISTED,ISBLACKLISTHOLD,RCVERIFICATIONSTATUS,EMAILADDRESS,PHONENUMBER,CREATEDDATE,CREATEDUSER,UPDATEDDATE,UPDATEDUSER,ISFEEWAIVER,FEEWAIVERPASSTYPE,VEHICLEIMGVERIFICATIONSTATUS,TAGTID,ISREVENUERECHARGE)) as VEHICLE FROM (SELECT *, row_number() OVER (PARTITION BY VEHICLENUMBER ORDER BY TAGSTARTEFFDATE DESC) AS rn FROM tp_customer_account) tmp WHERE rn=1 GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO")
res00.coalesce(1).write.json("D:/res39")

我的JSON文件看起来是这样的。

代码语言:javascript
复制
ACCOUNTNO    :    10003018
VEHICLE    :    Array
                0    :    Object

ACCOUNTNO    :    10003019
VEHICLE    :    Array
                0    :    Object
                1    :    Object
                2    :    Object

ACCOUNTNO    :    10003020
             VEHICLE    :    Array
             0    :    Object
                VEHICLENUMBER    :    "MH01AX5658"
                CUSTOMERID    :    20000006
                ACCOUNTGROUPID    :    21
                PREPAIDACCOUNTSTATUSID    :    3079
                PREPAIDACCOUNTSTATUSDATE    :    2015-09-16 14:58:27.593
                SOURCEOFENTRY    :    "RegularRetailer"
                REVENUECATEGORYID    :    75
                VEHICLECLASS    :    "4"
                SERIALNO    :    "137438955930"
                HEXTAGID    :    "9189070480200000099A"
                TAGSTATUS    :    "TAGINACTIVE"
                TAGSTARTEFFDATE    :    2013-06-16 12:27:34.997
                TAGENDEFFDATE    :    2015-09-16 15:21:39.960
                ISTAGBLACKLISTED    :    true
                ISBLACKLISTHOLD    :    false
                EMAILADDRESS    :    "prabjyotsingh.kalsi@yahoo.com ..."
                PHONENUMBER    :    "09909913811 "
                CREATEDDATE    :    2013-06-16 12:12:37.327
                CREATEDUSER    :    "bhagwadapos"
                UPDATEDDATE    :    2015-09-16 15:06:39.960
                UPDATEDUSER    :    "BLTagProcess"

使用pretty()在MongoDB数据库中使用相同JSON文件的另一个视图。

代码语言:javascript
复制
{
        "_id" : ObjectId("5b5ac17b0fef3110a8eb2319"),
        "ACCOUNTNO" : NumberLong(10003014),
        "VEHICLE" : [
                {
                        "VEHICLENUMBER" : "MH43AJ411",
                        "CUSTOMERID" : NumberLong(20000001),
                        "ACCOUNTGROUPID" : 15,
                        "PREPAIDACCOUNTSTATUSID" : 3079,
                        "PREPAIDACCOUNTSTATUSDATE" : ISODate("2015-09-16T09:28:27.500Z"),
                        "SOURCEOFENTRY" : "RegularRetailer",
                        "REVENUECATEGORYID" : 75,
                        "VEHICLECLASS" : "4",
                        "SERIALNO" : "206158433290",
                        "HEXTAGID" : "91890704803000000C0A",
                        "TAGSTATUS" : "TAGINACTIVE",
                        "TAGSTARTEFFDATE" : ISODate("2014-08-08T08:54:12.227Z"),
                        "TAGENDEFFDATE" : ISODate("2015-09-16T09:51:42.437Z"),
                        "ISTAGBLACKLISTED" : true,
                        "ISBLACKLISTHOLD" : false,
                        "EMAILADDRESS" : "shankarn75@rediffmail.com                                                                                                                                                                                     ",
                        "PHONENUMBER" : "9004419178     ",
                        "CREATEDDATE" : ISODate("2013-06-07T07:26:16.650Z"),
                        "CREATEDUSER" : "bhagwadapos",
                        "UPDATEDDATE" : ISODate("2015-09-16T09:36:42.437Z"),
                        "UPDATEDUSER" : "BLTagProcess"
                }
        ]
}
{
        "_id" : ObjectId("5b5ac17b0fef3110a8eb231a"),
        "ACCOUNTNO" : NumberLong(10003015),
        "VEHICLE" : [
                {
                        "VEHICLENUMBER" : "MH12GZ3392",
                        "CUSTOMERID" : NumberLong(20000002),
                        "ACCOUNTGROUPID" : 16,
                        "PREPAIDACCOUNTSTATUSID" : 2079,
                        "PREPAIDACCOUNTSTATUSDATE" : ISODate("2013-06-07T07:44:13.903Z"),
                        "SOURCEOFENTRY" : "RegularRetailer",
                        "REVENUECATEGORYID" : 75,
                        "VEHICLECLASS" : "4",
                        "SERIALNO" : "137438955875",
                        "HEXTAGID" : "91890704802000000963",
                        "TAGSTATUS" : "Assigned",
                        "TAGSTARTEFFDATE" : ISODate("2013-06-07T07:47:11.550Z"),
                        "TAGENDEFFDATE" : ISODate("2018-06-06T18:29:59.997Z"),
                        "ISTAGBLACKLISTED" : false,
                        "ISBLACKLISTHOLD" : false,
                        "EMAILADDRESS" : "hiteshmpatil@gmail.com                                                                                                                                                                                        ",
                        "PHONENUMBER" : "9823131243     ",
                        "CREATEDDATE" : ISODate("2013-06-07T07:45:29.337Z"),
                        "CREATEDUSER" : "bhagwadapos",
                        "UPDATEDDATE" : ISODate("2013-06-07T07:45:29.337Z"),
                        "UPDATEDUSER" : "bhagwadapos"
                }
        ]
}

因此,我在Server中还有3个表,比如电子邮件表、登录表和电话表。每个表都包含ACCOUNTNO

所有3个表都包含ACCOUNTNO,因此我必须将三个表中的每个表中包含的相同的ACCOUNTNO行插入到单个JSON文件中

因此,我需要在JSON文件中的ACCOUNTNO之后和VEHICLENO之前再添加3个数组,应该是这样的,如下所示

代码语言:javascript
复制
ACCOUNTNO    :    10003018
Email      :    Array
                0    :    Object
Login      :    Array
                0    :    Object
Phones    :    Array
                0    :    Object
VEHICLE    :    Array
                0    :    Object

列和一行值的TP_CUSTOMER_LOGINS 表视图:

代码语言:javascript
复制
LOGINID ACCOUNTNO   USERNAME    PASSWORD    LAST_LOGINDATE  LAST_PWD_MODIFIEDDATE   CURRENT_PWD_EXPIRYDATE  PWD_ATTEMPTS_COUNT  PINNUMBER   ISLOCKED    CREATEDDATE CREATEDUSER UPDATEDDATE UPDATEDUSER THEMES  LANGUAGES   STATUSID    USERTYPEID  ROLENAME    SQ_ATTEMPTCOUNT SQ_LOCKOUTTIME
        41118   10076338    user1212    passpasspasspass    27:23.2 20:29.0 20:12.8 0       0   20:29.0 deenkapoor  27:39.5 deenkapoor  Maroon  en-IN   2111    2       NULL    NULL    

列和一行值的TP_CUSTOMER_EMAILS 表视图:

代码语言:javascript
复制
CUSTMAILID  ACCOUNTNO   EMAILTYPE   EMAILADDRESS    ISACTIVE    ISCOMMUNICATION CREATEDDATE CREATEDUSER UPDATEDDATE UPDATEDUSER
38404   10078435    PrimaryEmail    something.ok@gmail.com  1   1   26:36.3 System  26:36.3 System  

列和一行值的TP_CUSTOMER_PHONES 表视图:

代码语言:javascript
复制
CUSTPHONEID ACCOUNTNO   PHONETYPE   PHONENUMBER EXTENTION   ISACTIVE    ISCOMMUNICATION CREATEDDATE CREATEDUSER UPDATEDDATE UPDATEDUSER
91831   10078435    MobileNo    9999999999      1   1   26:36.3 System  26:36.3 System

我必须加入SparkSQL查询。需要帮助。谢谢。

更新了!我尝试使用两个表连接,我已经编写了在Server中工作的SQL查询,但是它不能在SparkSQL中工作

代码:

代码语言:javascript
复制
package com.issuer.pack3.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


object sqltojson {

  def main(args:Array[String])
    {
      System.setProperty("hadoop.home.dir", "C:/winutil/")

      val db = "ISSUER"
      val table1 = "[HISTORY].[TP_CUSTOMER_PREPAIDACCOUNTS]"
      val custinfo1 = "[PLAY].TP_CUSTOMER_ADDRESSES"


      val conf = new SparkConf().setAppName("SQLtoJSON").setMaster("local[*]")
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      import sqlContext.implicits._

      val jdbcSqlConnStr = s"jdbc:sqlserver://192.168.70.15;databaseName=$db;user=bhaskar;password=welcome123;"      
      val jdbcDbTable1 = table1
      val jdbcDbTable2 = custinfo1
      val jdbcDF1 = sqlContext.read.format("jdbc").options(Map("url" -> jdbcSqlConnStr,"dbtable" -> jdbcDbTable1)).load()
      val jdbcDF2 = sqlContext.read.format("jdbc").options(Map("url" -> jdbcSqlConnStr,"dbtable" -> jdbcDbTable2)).load()
    //jdbcDF.show(10)
    //jdbcDF.printSchema


     jdbcDF1.registerTempTable("customer_account")
     jdbcDF2.registerTempTable("customer_address")
     val query1 = "WITH RowNumberedAccounts AS( select  O.CUSTADDRESSID ,O.ADDRESSTYPE ,O.ADDRESSLINE1 ,O.ADDRESSLINE2 ,O.ADDRESSLINE3 ,O.CITY,O.STATE  ,O.COUNTRY  ,O.ZIP1  ,O.ISACTIVE  ,O.ISCOMMUNICATION  ,O.CREATEDDATE  ,O.CREATEDUSER  ,O.UPDATEDDATE  ,O.UPDATEDUSER  ,O.REASONCODE ,O.ZIP2 ,C.ACCOUNTNO         as C_ACCNO        ,C.CUSTOMERID        ,C.ACCOUNTGROUPID        ,C.PREPAIDACCOUNTSTATUSID        ,C.PREPAIDACCOUNTSTATUSDATE        ,C.SOURCEOFENTRY        ,C.REVENUECATEGORYID        ,C.VEHICLENUMBER        ,C.VEHICLECLASS        ,C.SERIALNO        ,C.HEXTAGID        ,C.TAGSTATUS        ,C.TAGSTARTEFFDATE        ,C.TAGENDEFFDATE        ,C.ISTAGBLACKLISTED        ,C.ISBLACKLISTHOLD        ,C.RCVERIFICATIONSTATUS        ,C.EMAILADDRESS        ,C.PHONENUMBER        ,C.CREATEDDATE AS CCreatedDate  ,C.CREATEDUSER AS CCreatedUser        ,C.UPDATEDDATE AS CUpdatedDate        ,C.UPDATEDUSER AS CUpdatedUser        ,C.HISTID        ,C.ACTION        ,C.ISFEEWAIVER        ,C.FEEWAIVERPASSTYPE        ,C.VEHICLEIMGVERIFICATIONSTATUS        ,C.TAGTID        ,C.ISREVENUERECHARGE        , ROW_NUMBER() OVER (            PARTITION BY C.VEHICLENUMBER            ORDER BY C.TAGSTARTEFFDATE DESC) AS rn    from        customer_account c        INNER join customer_address o on c.ACCOUNTNO = o.ACCOUNTNO)SELECT    R.* FROM    RowNumberedAccounts AS R WHERE    rn = 1 order by C_ACCNO"
     val res00 = sqlContext.sql(query1.toString) 
     res00.registerTempTable("joined_acc_add")
     res00.show(10)
     val query2 = "SELECT C_ACCNO AS ACCOUNTNO, collect_set(struct(VEHICLENUMBER, CUSTOMERID,ACCOUNTGROUPID,PREPAIDACCOUNTSTATUSID,PREPAIDACCOUNTSTATUSDATE,SOURCEOFENTRY,REVENUECATEGORYID,VEHICLECLASS,SERIALNO,HEXTAGID,TAGSTATUS,TAGSTARTEFFDATE,TAGENDEFFDATE,ISTAGBLACKLISTED,ISBLACKLISTHOLD,RCVERIFICATIONSTATUS,EMAILADDRESS,PHONENUMBER,CREATEDDATE,CREATEDUSER,UPDATEDDATE,UPDATEDUSER,ISFEEWAIVER,FEEWAIVERPASSTYPE,VEHICLEIMGVERIFICATIONSTATUS,TAGTID,ISREVENUERECHARGE)) as VEHICLE, collect_set(struct(CUSTADDRESSID ,ADDRESSTYPE ,ADDRESSLINE1 ,ADDRESSLINE2 ,ADDRESSLINE3 ,CITY,O.STATE  ,COUNTRY  ,ZIP1  ,ISACTIVE  ,ISCOMMUNICATION  ,CREATEDDATE  ,CREATEDUSER  ,UPDATEDDATE  ,UPDATEDUSER  ,REASONCODE ,ZIP2)) as ADDRESS FROM joined_acc_add GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO"
     val res01 = sqlContext.sql(query2.toString)
     res01.coalesce(1).write.json("D:/result01")







//###########################
//  jdbcDF.withColumn("VEHICLE",struct("VEHICLENUMBER","CUSTOMERID")) // withColumn for Add or replace Columns, struct for Creates a new struct column.
//  .select("VEHICLE","ACCOUNTNO")
//  .groupBy("ACCOUNTNO")
//  .agg(collect_set("VEHICLE").as("VEHICLE")). //collect_set(Column e) It's an Aggregate function: returns a set of objects with duplicate elements eliminated.
//  orderBy("ACCOUNTNO").
//  coalesce(1).write.json("D:/res10")


    }
  }

错误StackTrace:

代码语言:javascript
复制
18/08/07 13:31:25 INFO DAGScheduler: Job 0 finished: show at sqltojson.scala:40, took 99.958582 s
18/08/07 13:31:25 INFO CodeGenerator: Code generated in 11.907295 ms
18/08/07 13:31:25 INFO SparkSqlParser: Parsing command: SELECT C_ACCNO AS ACCOUNTNO, collect_set(struct(VEHICLENUMBER, CUSTOMERID,ACCOUNTGROUPID,PREPAIDACCOUNTSTATUSID,PREPAIDACCOUNTSTATUSDATE,SOURCEOFENTRY,REVENUECATEGORYID,VEHICLECLASS,SERIALNO,HEXTAGID,TAGSTATUS,TAGSTARTEFFDATE,TAGENDEFFDATE,ISTAGBLACKLISTED,ISBLACKLISTHOLD,RCVERIFICATIONSTATUS,EMAILADDRESS,PHONENUMBER,CREATEDDATE,CREATEDUSER,UPDATEDDATE,UPDATEDUSER,ISFEEWAIVER,FEEWAIVERPASSTYPE,VEHICLEIMGVERIFICATIONSTATUS,TAGTID,ISREVENUERECHARGE)) as VEHICLE, collect_set(struct(CUSTADDRESSID ,ADDRESSTYPE ,ADDRESSLINE1 ,ADDRESSLINE2 ,ADDRESSLINE3 ,CITY,O.STATE  ,COUNTRY  ,ZIP1  ,ISACTIVE  ,ISCOMMUNICATION  ,CREATEDDATE  ,CREATEDUSER  ,UPDATEDDATE  ,UPDATEDUSER  ,REASONCODE ,ZIP2)) as ADDRESS FROM joined_acc_add GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO
+-------------+-----------+--------------------+--------------------+------------+-----------+-----+-------+------+--------+---------------+--------------------+-----------+--------------------+-----------+----------+----+--------+----------+--------------+----------------------+------------------------+---------------+-----------------+-------------+------------+------------+--------------------+---------------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+---------------+--------------------+------------+--------------------+------------+-------+------+-----------+-----------------+----------------------------+------+-----------------+---+
|CUSTADDRESSID|ADDRESSTYPE|        ADDRESSLINE1|        ADDRESSLINE2|ADDRESSLINE3|       CITY|STATE|COUNTRY|  ZIP1|ISACTIVE|ISCOMMUNICATION|         CREATEDDATE|CREATEDUSER|         UPDATEDDATE|UPDATEDUSER|REASONCODE|ZIP2| C_ACCNO|CUSTOMERID|ACCOUNTGROUPID|PREPAIDACCOUNTSTATUSID|PREPAIDACCOUNTSTATUSDATE|  SOURCEOFENTRY|REVENUECATEGORYID|VEHICLENUMBER|VEHICLECLASS|    SERIALNO|            HEXTAGID|      TAGSTATUS|     TAGSTARTEFFDATE|       TAGENDEFFDATE|ISTAGBLACKLISTED|ISBLACKLISTHOLD|RCVERIFICATIONSTATUS|        EMAILADDRESS|    PHONENUMBER|        CCreatedDate|CCreatedUser|        CUpdatedDate|CUpdatedUser| HISTID|ACTION|ISFEEWAIVER|FEEWAIVERPASSTYPE|VEHICLEIMGVERIFICATIONSTATUS|TAGTID|ISREVENUERECHARGE| rn|
+-------------+-----------+--------------------+--------------------+------------+-----------+-----+-------+------+--------+---------------+--------------------+-----------+--------------------+-----------+----------+----+--------+----------+--------------+----------------------+------------------------+---------------+-----------------+-------------+------------+------------+--------------------+---------------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+---------------+--------------------+------------+--------------------+------------+-------+------+-----------+-----------------+----------------------------+------+-----------------+---+
|           41|    Mailing|B309 PROGRESSIVE ...|            SECTOR-6|    GHANSOLI|NAVI MUMBAI|   MH|    IND|400701|    true|           true|2013-06-07 12:55:...|bhagwadapos|2013-06-07 12:55:...|bhagwadapos|      null|null|10003014|  20000001|            15|                  3079|    2015-09-16 14:58:...|RegularRetailer|               75|    MH43AJ411|           4|206158433290|91890704803000000C0A|    TAGINACTIVE|2014-08-08 14:24:...|2015-09-16 15:21:...|            true|          false|                null|shankarn75@rediff...|9004419178     |2013-06-07 12:56:...| bhagwadapos|2015-09-16 15:06:...|BLTagProcess|  16257|UPDATE|       null|             null|                        null|  null|             null|  1|
|           86|    Mailing|FLAT NO 11 TANUSH...|           SAI CHOWK|            |       PUNE|   MH|    IND|411027|    true|           true|2013-06-07 13:11:...|bhagwadapos|2013-06-07 13:11:...|bhagwadapos|      null|null|10003015|  20000002|            16|                  2079|    2013-06-07 13:14:...|RegularRetailer|               75|   MH12GZ3392|           4|137438955875|91890704802000000963|       Assigned|2013-06-07 13:17:...|2018-06-06 23:59:...|           false|          false|                null|hiteshmpatil@gmai...|9823131243     |2013-06-07 13:15:...| bhagwadapos|2013-06-07 13:15:...| bhagwadapos|      3|INSERT|       null|             null|                        null|  null|             null|  1|
|           42|    Mailing|at-susmma  sadan ...|kotak  vallay  pa...|            |     valsad|   GJ|    IND|396001|    true|           true|2013-06-07 14:28:...|bhagwadapos|2013-06-07 14:28:...|bhagwadapos|      null|null|10003016|  20000003|            17|                  2131|    2014-11-24 02:30:...|RegularRetailer|               75|    GJ15Z8173|           9|137438955877|91890704802000000965|    TAGINACTIVE|2013-06-07 14:46:...|2014-11-24 02:52:...|            true|          false|                null|bhagwada.irb@gmai...|8652836666     |2013-06-07 14:31:...| bhagwadapos|2014-11-24 02:37:...|BLTagProcess|   7747|UPDATE|       null|             null|                        null|  null|             null|  1|
|           43|    Mailing|Flat No 1, Buildi...|    Near Shivaji Pak|            |      Dadar|   MH|    IND|400002|    true|           true|2013-06-13 12:48:...| charotipos|2013-06-13 12:48:...| charotipos|      null|null|10003018|  20000004|            19|                  2131|    2014-11-24 02:30:...|RegularRetailer|               75|    MH05AM902|          11|137438955473|918907048020000007D1|    TAGINACTIVE|2013-06-13 13:15:...|2014-11-24 02:51:...|            true|          false|                null|kelkar.suhas@gmai...|9821032045     |2013-06-13 12:50:...|  charotipos|2014-11-24 02:36:...|BLTagProcess|   7700|UPDATE|       null|             null|                        null|  null|             null|  1|
|           87|    Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...|            |     VALSAD|   GJ|    IND|396035|    true|           true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos|      null|null|10003019|  20009020|            20|                  2079|    2016-05-25 18:22:...|       Internal|               75|   GJ15CF7747|           4| 68719486473|91890704801000002609|       ASSIGNED|2016-05-25 18:46:...| 2041-05-25 23:59:59|           false|           null|                2083|kaviwala@desaicon...|9879110770     |2016-05-25 18:22:...|      263858|2017-02-27 11:35:...|     HUSSAIN|2064466|UPDATE|       null|             null|                        null|  null|             null|  1|
|           87|    Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...|            |     VALSAD|   GJ|    IND|396035|    true|           true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos|      null|null|10003019|  20000008|            20|                  2129|    2016-02-16 02:40:...|RegularRetailer|               75|    GJ15CB727|           4|137438955936|918907048020000009A0|RETURNEDDAMAGED|2013-06-17 12:36:...|2016-02-08 17:12:...|            true|          false|                null|kaviwala@desaicon...|9879110770     |2013-06-17 12:26:...| bhagwadapos|2016-02-16 02:46:...|BatchProcess|  29254|UPDATE|       null|             null|                        null|  null|             null|  1|
|           87|    Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...|            |     VALSAD|   GJ|    IND|396035|    true|           true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos|      null|null|10003019|  20001223|            20|                  2079|    2014-06-06 14:52:...|       AgentPOS|               75|   GJ15CA7837|           4|137438956220|91890704802000000ABC|       ASSIGNED|2014-06-06 14:57:...| 2039-06-06 23:59:59|           false|           null|                2083|kaviwala@desaicon...|9879110770     |2014-06-06 15:00:...| bhagwadapos|2017-02-27 11:35:...|     HUSSAIN|2064457|UPDATE|       null|             null|                        null|  null|             null|  1|
|           87|    Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...|            |     VALSAD|   GJ|    IND|396035|    true|           true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos|      null|null|10003019|  20029961|            20|                  2079|    2016-07-28 16:27:...|       Internal|               75|   GJ15CD7387|           4| 68719511515|918907048010000087DB|       ASSIGNED|2016-07-28 19:21:...| 2041-07-28 23:59:59|           false|           null|                2083|kaviwala@desaicon...|9879110770     |2016-07-28 16:27:...|      280603|2017-02-07 17:24:...|     HUSSAIN|1607128|UPDATE|       null|             null|                        null|  null|             null|  1|
|           87|    Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...|            |     VALSAD|   GJ|    IND|396035|    true|           true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos|      null|null|10003019|  20001557|            20|                  2079|    2014-10-01 18:22:...|       Internal|               75|   GJ15CB9601|           4| 68719479744|91890704801000000BC0|       ASSIGNED|2016-05-05 16:45:...| 2041-05-05 23:59:59|           false|           null|                2083|kaviwala@desaicon...|9879110770     |2014-10-01 18:33:...|      263858|2017-02-27 11:35:...|     HUSSAIN|2064460|UPDATE|       null|             null|                        null|  null|             null|  1|
|           87|    Mailing|DESAICONSTRUCTION...|OPP NEW GIDC NH08...|            |     VALSAD|   GJ|    IND|396035|    true|           true|2013-06-15 11:18:...|bhagwadapos|2013-06-15 11:18:...|bhagwadapos|      null|null|10003019|  20000933|            20|                  2079|    2014-02-12 13:52:...|RegularRetailer|               75|   MH02DG7774|           4|137438956174|91890704802000000A8E|       Assigned|2014-02-12 13:49:...|2019-02-11 23:59:...|           false|          false|                null|kaviwala@desaicon...|9879110770     |2014-02-12 13:43:...| bhagwadapos|2017-02-27 11:35:...|     HUSSAIN|2064453|UPDATE|       null|             null|                        null|  null|             null|  1|
+-------------+-----------+--------------------+--------------------+------------+-----------+-----+-------+------+--------+---------------+--------------------+-----------+--------------------+-----------+----------+----+--------+----------+--------------+----------------------+------------------------+---------------+-----------------+-------------+------------+------------+--------------------+---------------+--------------------+--------------------+----------------+---------------+--------------------+--------------------+---------------+--------------------+------------+--------------------+------------+-------+------+-----------+-----------------+----------------------------+------+-----------------+---+
only showing top 10 rows

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`ACCOUNTNO`' given input columns: [REASONCODE, EMAILADDRESS, ADDRESSTYPE, VEHICLEIMGVERIFICATIONSTATUS, CUpdatedUser, ISBLACKLISTHOLD, TAGENDEFFDATE, ZIP2, VEHICLENUMBER, ISFEEWAIVER, ZIP1, FEEWAIVERPASSTYPE, UPDATEDUSER, PREPAIDACCOUNTSTATUSID, C_ACCNO, PHONENUMBER, ISACTIVE, SERIALNO, ACTION, CREATEDUSER, rn, ADDRESSLINE2, HISTID, PREPAIDACCOUNTSTATUSDATE, TAGTID, CCreatedDate, ADDRESSLINE3, CUpdatedDate, ISCOMMUNICATION, ADDRESSLINE1, ACCOUNTGROUPID, CITY, COUNTRY, CUSTADDRESSID, CREATEDDATE, CUSTOMERID, VEHICLECLASS, TAGSTARTEFFDATE, REVENUECATEGORYID, CCreatedUser, ISTAGBLACKLISTED, RCVERIFICATIONSTATUS, STATE, HEXTAGID, ISREVENUERECHARGE, UPDATEDDATE, SOURCEOFENTRY, TAGSTATUS]; line 1 pos 730;
'Sort ['ACCOUNTNO ASC NULLS FIRST], true
+- 'Aggregate ['ACCOUNTNO], [C_ACCNO#100L AS ACCOUNTNO#207L, collect_set(named_struct(VEHICLENUMBER, VEHICLENUMBER#7, CUSTOMERID, CUSTOMERID#0L, ACCOUNTGROUPID, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY, SOURCEOFENTRY#5, REVENUECATEGORYID, REVENUECATEGORYID#6, VEHICLECLASS, VEHICLECLASS#8, SERIALNO, SERIALNO#9, HEXTAGID, HEXTAGID#10, TAGSTATUS, TAGSTATUS#11, TAGSTARTEFFDATE, TAGSTARTEFFDATE#12, ... 30 more fields), 0, 0) AS VEHICLE#208, 'collect_set(named_struct(CUSTADDRESSID, CUSTADDRESSID#61L, ADDRESSTYPE, ADDRESSTYPE#63, ADDRESSLINE1, ADDRESSLINE1#64, ADDRESSLINE2, ADDRESSLINE2#65, ADDRESSLINE3, ADDRESSLINE3#66, CITY, CITY#67, NamePlaceholder, 'O.STATE, COUNTRY, COUNTRY#69, ZIP1, ZIP1#70, ISACTIVE, ISACTIVE#71, ISCOMMUNICATION, ISCOMMUNICATION#72, CREATEDDATE, CREATEDDATE#73, ... 10 more fields)) AS ADDRESS#209]
   +- SubqueryAlias joined_acc_add
      +- Sort [C_ACCNO#100L ASC NULLS FIRST], true
         +- Project [CUSTADDRESSID#61L, ADDRESSTYPE#63, ADDRESSLINE1#64, ADDRESSLINE2#65, ADDRESSLINE3#66, CITY#67, STATE#68, COUNTRY#69, ZIP1#70, ISACTIVE#71, ISCOMMUNICATION#72, CREATEDDATE#73, CREATEDUSER#74, UPDATEDDATE#75, UPDATEDUSER#76, REASONCODE#77, ZIP2#78, C_ACCNO#100L, CUSTOMERID#0L, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY#5, REVENUECATEGORYID#6, ... 24 more fields]
            +- Filter (rn#105 = 1)
               +- SubqueryAlias R
                  +- SubqueryAlias RowNumberedAccounts
                     +- Project [CUSTADDRESSID#61L, ADDRESSTYPE#63, ADDRESSLINE1#64, ADDRESSLINE2#65, ADDRESSLINE3#66, CITY#67, STATE#68, COUNTRY#69, ZIP1#70, ISACTIVE#71, ISCOMMUNICATION#72, CREATEDDATE#73, CREATEDUSER#74, UPDATEDDATE#75, UPDATEDUSER#76, REASONCODE#77, ZIP2#78, C_ACCNO#100L, CUSTOMERID#0L, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY#5, REVENUECATEGORYID#6, ... 24 more fields]
                        +- Project [CUSTADDRESSID#61L, ADDRESSTYPE#63, ADDRESSLINE1#64, ADDRESSLINE2#65, ADDRESSLINE3#66, CITY#67, STATE#68, COUNTRY#69, ZIP1#70, ISACTIVE#71, ISCOMMUNICATION#72, CREATEDDATE#73, CREATEDUSER#74, UPDATEDDATE#75, UPDATEDUSER#76, REASONCODE#77, ZIP2#78, C_ACCNO#100L, CUSTOMERID#0L, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY#5, REVENUECATEGORYID#6, ... 25 more fields]
                           +- Window [row_number() windowspecdefinition(VEHICLENUMBER#7, TAGSTARTEFFDATE#12 DESC NULLS LAST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#105], [VEHICLENUMBER#7], [TAGSTARTEFFDATE#12 DESC NULLS LAST]
                              +- Project [CUSTADDRESSID#61L, ADDRESSTYPE#63, ADDRESSLINE1#64, ADDRESSLINE2#65, ADDRESSLINE3#66, CITY#67, STATE#68, COUNTRY#69, ZIP1#70, ISACTIVE#71, ISCOMMUNICATION#72, CREATEDDATE#73, CREATEDUSER#74, UPDATEDDATE#75, UPDATEDUSER#76, REASONCODE#77, ZIP2#78, ACCOUNTNO#1L AS C_ACCNO#100L, CUSTOMERID#0L, ACCOUNTGROUPID#2, PREPAIDACCOUNTSTATUSID#3, PREPAIDACCOUNTSTATUSDATE#4, SOURCEOFENTRY#5, REVENUECATEGORYID#6, ... 23 more fields]
                                 +- Join Inner, (ACCOUNTNO#1L = ACCOUNTNO#62L)
                                    :- SubqueryAlias c
                                    :  +- SubqueryAlias customer_account
                                    :     +- Relation[CUSTOMERID#0L,ACCOUNTNO#1L,ACCOUNTGROUPID#2,PREPAIDACCOUNTSTATUSID#3,PREPAIDACCOUNTSTATUSDATE#4,SOURCEOFENTRY#5,REVENUECATEGORYID#6,VEHICLENUMBER#7,VEHICLECLASS#8,SERIALNO#9,HEXTAGID#10,TAGSTATUS#11,TAGSTARTEFFDATE#12,TAGENDEFFDATE#13,ISTAGBLACKLISTED#14,ISBLACKLISTHOLD#15,RCVERIFICATIONSTATUS#16,EMAILADDRESS#17,PHONENUMBER#18,CREATEDDATE#19,CREATEDUSER#20,UPDATEDDATE#21,UPDATEDUSER#22,HISTID#23L,... 6 more fields] JDBCRelation([HISTORY].[TP_CUSTOMER_PREPAIDACCOUNTS]) [numPartitions=1]
                                    +- SubqueryAlias o
                                       +- SubqueryAlias customer_address
                                          +- Relation[CUSTADDRESSID#61L,ACCOUNTNO#62L,ADDRESSTYPE#63,ADDRESSLINE1#64,ADDRESSLINE2#65,ADDRESSLINE3#66,CITY#67,STATE#68,COUNTRY#69,ZIP1#70,ISACTIVE#71,ISCOMMUNICATION#72,CREATEDDATE#73,CREATEDUSER#74,UPDATEDDATE#75,UPDATEDUSER#76,REASONCODE#77,ZIP2#78] JDBCRelation([PLAY].TP_CUSTOMER_ADDRESSES) [numPartitions=1]


18/08/07 13:31:26 INFO ShutdownHookManager: Shutdown hook called

如果我成功地获得了结果,那么我将能够编写SparkSQL查询,在Spark应用程序中的下一个查询中设计JSON,就像@RameshMaharjan所说的那样。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-08-08 21:16:05

我这样做了,并归功于@Ramesh Maharajan,因为他在评论中建议了我的方法,我已经实现了,我在下面展示了一些代码片段描述,这些代码片段说明了我是如何成功实现的。

代码语言:javascript
复制
  val db = "databasename"
  val table1 = "[HISTORY].[TP_CUSTOMER_PREPAIDACCOUNTS]"
  val custinfo1 = "[confidential].TP_CUSTOMER_ADDRESSES"
  val custinfo2 = "[confidential].TP_CUSTOMER_BUSINESS"
  val custinfo3 = "[confidential].TP_CUSTOMER_EMAILS"
  val custinfo4 = "[confidential].TP_CUSTOMER_LOGINS"
  val custinfo5 = "[confidential].TP_CUSTOMER_PHONES"

我已经注册了主表和每个客户信息表,并把它们连接在一起,所以我使用了左表。

因此,我编写的第一个联接查询:

代码语言:javascript
复制
val query1 = "WITH RowNumberedAccounts AS(select  O.CUSTADDRESSID ,O.ADDRESSTYPE ,O.ADDRESSLINE1 ,O.ADDRESSLINE2 ,O.ADDRESSLINE3 ,O.CITY,O.STATE  ,O.COUNTRY  ,O.ZIP1  ,O.ISACTIVE  ,O.ISCOMMUNICATION  ,O.CREATEDDATE  ,O.CREATEDUSER  ,O.UPDATEDDATE  ,O.UPDATEDUSER  ,O.REASONCODE ,O.ZIP2   ,C.ACCOUNTNO         as C_ACCNO        ,C.CUSTOMERID        ,C.ACCOUNTGROUPID        ,C.PREPAIDACCOUNTSTATUSID        ,C.PREPAIDACCOUNTSTATUSDATE        ,C.SOURCEOFENTRY        ,C.REVENUECATEGORYID        ,C.VEHICLENUMBER        ,C.VEHICLECLASS        ,C.SERIALNO        ,C.HEXTAGID        ,C.TAGSTATUS        ,C.TAGSTARTEFFDATE        ,C.TAGENDEFFDATE        ,C.ISTAGBLACKLISTED        ,C.ISBLACKLISTHOLD        ,C.RCVERIFICATIONSTATUS        ,C.EMAILADDRESS        ,C.PHONENUMBER        ,C.CREATEDDATE AS CCreatedDate  ,C.CREATEDUSER AS CCreatedUser        ,C.UPDATEDDATE AS CUpdatedDate        ,C.UPDATEDUSER AS CUpdatedUser        ,C.HISTID        ,C.ACTION        ,C.ISFEEWAIVER        ,C.FEEWAIVERPASSTYPE        ,C.VEHICLEIMGVERIFICATIONSTATUS        ,C.TAGTID        ,C.ISREVENUERECHARGE        , ROW_NUMBER() OVER (            PARTITION BY C.VEHICLENUMBER            ORDER BY C.TAGSTARTEFFDATE DESC) AS rn    from        customer_account c        LEFT join customer_address o on c.ACCOUNTNO = o.ACCOUNTNO)SELECT    R.* FROM    RowNumberedAccounts AS R WHERE    rn = 1 order by C_ACCNO"

在此之后,我已经加入了另一个查询,的最终查询 of SparkSQL是:

代码语言:javascript
复制
val queryF = "SELECT C_ACCNO AS ACCOUNTNO, collect_set(struct(VEHICLENUMBER, CUSTOMERID,ACCOUNTGROUPID,PREPAIDACCOUNTSTATUSID,PREPAIDACCOUNTSTATUSDATE,SOURCEOFENTRY,REVENUECATEGORYID,VEHICLECLASS,SERIALNO,HEXTAGID,TAGSTATUS,TAGSTARTEFFDATE,TAGENDEFFDATE,ISTAGBLACKLISTED,ISBLACKLISTHOLD,RCVERIFICATIONSTATUS,EMAILADDRESS,PHONENUMBER,ISFEEWAIVER,FEEWAIVERPASSTYPE,VEHICLEIMGVERIFICATIONSTATUS,TAGTID,ISREVENUERECHARGE,CCreatedDate,CCreatedUser,CUpdatedDate,CUpdatedUser)) as VEHICLE, collect_set(struct(CUSTADDRESSID ,ADDRESSTYPE ,ADDRESSLINE1 ,ADDRESSLINE2 ,ADDRESSLINE3 ,CITY,STATE  ,COUNTRY  ,ZIP1  ,ISACTIVE  ,ISCOMMUNICATION  ,CREATEDDATE  ,CREATEDUSER  ,UPDATEDDATE  ,UPDATEDUSER  ,REASONCODE ,ZIP2)) as ADDRESS, collect_set(struct(ORGANISATIONNAME,DATEOFINCORPORATION,PANCARDNUMBER,ORGANIZATIONTYPEID)) as BUSINESS, collect_set(struct(CUSTMAILID,EMAILTYPE,EMAIL,ISACTIVE_EMAIL,ISCOMMUNICATION_EMAIL)) as EMAIL, collect_set(struct(LOGINID,    USERNAME,   PASSWORD,   LAST_LOGINDATE, LAST_PWD_MODIFIEDDATE,  CURRENT_PWD_EXPIRYDATE, PWD_ATTEMPTS_COUNT, PINNUMBER,  ISLOCKED,THEMES,LANGUAGES,  STATUSID,   USERTYPEID, ROLENAME,   SQ_ATTEMPTCOUNT,    SQ_LOCKOUTTIME)) as LOGIN, collect_set(struct(CUSTPHONEID,  PHONETYPE, PHONENUMBER_PHONES,  EXTENTION, ISACTIVE_PHONES, ISCOMMUNICATION_PHONES)) as PHONES FROM joined_acc_phones GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO"
     val res0F = sqlContext.sql(queryF.toString)
     res0F.show(10)
     res0F.coalesce(1).write.json("D:/result01")
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51653050

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档