首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用scala扁平Spark2中嵌套的Json文档

用scala扁平Spark2中嵌套的Json文档
EN

Stack Overflow用户
提问于 2018-03-05 14:15:36
回答 2查看 6.2K关注 0票数 1

我正在尝试将json文件解析为csv文件。

这个结构有点复杂,我用scala编写了一个火花程序来完成这个任务。就像文档在每行不包含json对象一样,我决定使用wholeTextFiles方法,就像我在一些答案和帖子中建议的那样。

代码语言:javascript
复制
val jsonRDD  = spark.sparkContext.wholeTextFiles(fileInPath).map(x => x._2)

然后我在dataframe中读取json内容。

代码语言:javascript
复制
val dwdJson = spark.read.json(jsonRDD)

然后,我想导航json和扁平的数据。这是来自dwdJson的模式

代码语言:javascript
复制
root
 |-- meta: struct (nullable = true)
 |    |-- dimensions: struct (nullable = true)
 |    |    |-- lat: long (nullable = true)
 |    |    |-- lon: long (nullable = true)
 |    |-- directory: string (nullable = true)
 |    |-- filename: string (nullable = true)
 |-- records: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- grids: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- gPt: array (nullable = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |-- time: string (nullable = true)

这是我最好的方法:

代码语言:javascript
复制
val dwdJson_e1 = dwdJson.select($"meta.filename", explode($"records").as("records_flat"))
val dwdJson_e2 = dwdJson_e1.select($"filename", $"records_flat.time",explode($"records_flat.grids").as("gPt"))
val dwdJson_e3 = dwdJson_e2.select($"filename", $"time", $"gPt.gPt")
val dwdJson_flat = dwdJson_e3.select($"filename"
      ,$"time"
      ,$"gPt".getItem(0).as("lat1")
      ,$"gPt".getItem(1).as("long1")
      ,$"gPt".getItem(2).as("lat2")
      ,$"gPt".getItem(3).as("long2")
      ,$"gPt".getItem(4).as("value"))

我是scala新手,我想知道我是否可以避免创建中间数据(dwdJson_e1、dwdJson_e2、dwdJson_e3),这看起来效率很低,程序运行非常慢(与运行在膝上型计算机中的java解析器相比)。

另一方面,我找不到如何解除这些嵌套数组的绑定。

spark版本: 2.0.0 scala: 2.11.8 java: 1.8

**

编辑1:示例Json文件和csv输出

**

这是一个我想转换的示例Json文件:

代码语言:javascript
复制
{
  "meta" : {
    "directory" : "weather/cosmo/de/grib/12/aswdir_s",
    "filename" : "COSMODE_single_level_elements_ASWDIR_S_2018022312_000.grib2.bz2",
    "dimensions" : {
      "lon" : 589,
      "time" : 3,
      "lat" : 441
    }
   },
  "records" : [ {
    "grids" : [ {
      "gPt" : [ 45.175, 13.55, 45.2, 13.575, 3.366295E-7 ]
    }, {
      "gPt" : [ 45.175, 13.575, 45.2, 13.6, 3.366295E-7 ]
    }, {
      "gPt" : [ 45.175, 13.6, 45.2, 13.625, 3.366295E-7 ]
    } ],
    "time" : "2018-02-23T12:15:00Z"
  }, {
    "grids" : [ {
      "gPt" : [ 45.175, 13.55, 45.2, 13.575, 4.545918E-7 ]
    }, {
      "gPt" : [ 45.175, 13.575, 45.2, 13.6, 4.545918E-7 ]
    }, {
      "gPt" : [ 45.175, 13.6, 45.2, 13.625, 4.545918E-7 ]
    }
    ],
    "time" : "2018-02-23T12:30:00Z"
    }
    ]
}

这是上面json的一个示例输出:

代码语言:javascript
复制
filename, time, lat1, long1, lat2, long2, value
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.55, 45.2, 13.575,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.575, 45.2, 13.6,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.6, 45.2, 13.625,3.366295E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175, 13.55, 45.2,13.575,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.575,45.2,13.6,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.6,45.2,13.625,4.545918E-7

任何帮助都将不胜感激。致以亲切的问候,

EN

回答 2

Stack Overflow用户

发布于 2018-10-24 11:49:10

你可以试试下面的代码。对于complext json医生我起了作用。

代码语言:javascript
复制
def flattenDataframe(df: DataFrame): DataFrame = {

val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length

for(i <- 0 to fields.length-1){
  val field = fields(i)
  val fieldtype = field.dataType
  val fieldName = field.name
  fieldtype match {
    case arrayType: ArrayType =>
      val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
      val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
     // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
      val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
      return flattenDataframe(explodedDf)
    case structType: StructType =>
      val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
      val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
      val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
     val explodedf = df.select(renamedcols:_*)
      return flattenDataframe(explodedf)
    case _ =>
  }
}
df

}

票数 4
EN

Stack Overflow用户

发布于 2018-03-06 09:15:15

我认为你的做法完全正确。关于avoid create the intermediate dataframes,您实际上可以连续地编写语句,而不必将其分解为中间数据格式,如

代码语言:javascript
复制
 val df = dwdJson.select($"meta.filename", explode($"records").as("record")).
    select($"filename", $"record.time", explode($"record.grids").as("grids")).
    select($"filename", $"time", $"grids.gpt").
    select($"filename", $"time", 
              $"gpt"(0).as("lat1"), 
              $"gpt"(1).as("long1"), 
              $"gpt"(2).as("lat2"),
              $"gpt"(3).as("long2"), 
              $"gpt"(4).as("value"))

我也考虑到了表演的问题。Spark在内部使用Jackson lib来解析json,并且它必须通过输入的采样记录来插入模式本身(默认的样本比率为1.0,即所有记录)。因此,如果您有大输入、大文件( wholeTextFiles操作)和复杂的模式,它将影响程序性能。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49112437

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档