我有一个带模式的df -
root
|-- arrayCol: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- email: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- qty: long (nullable = true)
| | |-- rqty: long (nullable = true)
| | |-- pids: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- sqty: long (nullable = true)
| | |-- id1: string (nullable = true)
| | |-- id2: string (nullable = true)
| | |-- window: struct (nullable = true)
| | | |-- end: string (nullable = true)
| | | |-- start: string (nullable = true)
| | |-- otherId: string (nullable = true)
|-- primarykey: string (nullable = true)
|-- runtime: string (nullable = true)我不想使用explode,因为它非常慢,我想尝试一下flapMap。
我试过了-
val ds = df1.as[(Array[StructType], String, String)]
ds.flatMap{ case(x, y, z) => x.map((_, y, z))}.toDF()这给了我一个错误-
scala.MatchError: org.apache.spark.sql.types.StructType
如何展平arrayCol?
示例数据-
{
"primaryKeys":"sfdfrdsdjn",
"runtime":"2020-10-31T13:01:04.813Z",
"arrayCol":[{"id":"qwerty","id1":"dsfdsfdsf","window":{"start":"2020-11-01T10:30:00Z","end":"2020-11-01T12:30:00Z"}, "email":[],"id2":"sdfsdfsdPuyOplzlR1idvfPkv5138g","rqty":3,"sqty":3,"qty":3,"otherId":null}]
}预期输出-
primaryKey runtime arrayCol
sfdfrdsdjn 2020-10-31T13:01:04.813Z {"id":"qwerty","id1":"dsfdsfdsf","window":{"start":"2020-11-01T10:30:00Z","end":"2020-11-01T12:30:00Z"}, "email":[],"id2":"sdfsdfsdPuyOplzlR1idvfPkv5138g","rqty":3,"sqty":3,"qty":3,"otherId":null}我希望arrayCol中的每个元素都占一行。就像explode(arrayCol)一样
发布于 2020-11-22 21:45:37
你差一点就成功了。记住在scala中使用spark的时候,always try to use the Dataset API as often as possible。这不仅增加了可读性,而且有助于快速解决这些类型的问题。
case class ArrayColWindow(end:String,start:String)
case class ArrayCol(id:String,email:Seq[String], qty:Long,rqty:Long,pids:Seq[String],
sqty:Long,id1:String,id2:String,window:ArrayColWindow, otherId:String)
case class FullArrayCols(arrayCol:Seq[ArrayCol],primarykey:String,runtime:String)
val inputTest = List(
FullArrayCols(Seq(ArrayCol("qwerty", Seq(), 3, 3, Seq(), 3, "dsfdsfdsf", "sdfsdfsdPuyOplzlR1idvfPkv5138g",
ArrayColWindow("2020-11-01T10:30:00Z", "2020-11-01T12:30:00Z"), null)),
"sfdfrdsdjn", "2020-10-31T13:01:04.813Z")
).toDS()
val output = inputTest.as[(Seq[ArrayCol],String,String)].flatMap{ case(x, y, z) => x.map((_, y, z))}
output.show(truncate=false)发布于 2020-11-22 22:08:32
你可以直接改变
val ds = df1.as[(Array[StructType], String, String)] 至
val ds = df1.as[(Array[String], String, String)] 这样你就可以摆脱这个错误,看到你想要的输出。
https://stackoverflow.com/questions/64952560
复制相似问题