我有一个使用下面代码创建的示例Dataframe
val data = Seq(
Row(20.0, "dog"),
Row(3.5, "cat"),
Row(0.000006, "ant")
)
val schema = StructType(
List(
StructField("weight", DoubleType, true),
StructField("animal_type", StringType, true)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
schema
)
val actualDF = df.withColumn(
"animal_interpretation",
struct(
(col("weight") > 5).as("is_large_animal"),
col("animal_type").isin("rat", "cat", "dog").as("is_mammal")
)
)
actualDF.show(false)
+------+-----------+---------------------+
|weight|animal_type|animal_interpretation|
+------+-----------+---------------------+
|20.0 |dog |[true,true] |
|3.5 |cat |[false,true] |
|6.0E-6|ant |[false,false] |
+------+-----------+---------------------+此Spark的模式可以使用-
scala> actualDF.printSchema
root
|-- weight: double (nullable = true)
|-- animal_type: string (nullable = true)
|-- animal_interpretation: struct (nullable = false)
| |-- is_large_animal: boolean (nullable = true)
| |-- is_mammal: boolean (nullable = true)但是,我想以一个有3列的数据格式( field, type, nullable )的形式获得这个模式。模式的输出数据应该是这样的-
+-------------------------------------+--------------+--------+
|field |type |nullable|
+-------------------------------------+--------------+--------+
|weight |double |true |
|animal_type |string |true |
|animal_interpretation |struct |false |
|animal_interpretation.is_large_animal|boolean |true |
|animal_interpretation.is_mammal |boolean |true |
+----------------------------------------------------+--------+我怎样才能在星火中实现这一点。我正在使用Scala进行编码。
发布于 2019-07-16 22:57:35
下面是一个完整的示例,包括您的代码。我使用了一些常见的flattenSchema方法进行匹配,就像Shankar做的那样遍历结构,但是没有让这个方法返回扁平的模式,而是使用了一个ArrayBuffer来聚合StructType的数据类型并返回ArrayBuffer。然后,我将ArrayBuffer转换为序列,最后使用Spark将序列转换为DataFrame。
import org.apache.spark.sql.types.{StructType, StructField, DoubleType, StringType}
import org.apache.spark.sql.functions.{struct, col}
import scala.collection.mutable.ArrayBuffer
val data = Seq(
Row(20.0, "dog"),
Row(3.5, "cat"),
Row(0.000006, "ant")
)
val schema = StructType(
List(
StructField("weight", DoubleType, true),
StructField("animal_type", StringType, true)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
schema
)
val actualDF = df.withColumn(
"animal_interpretation",
struct(
(col("weight") > 5).as("is_large_animal"),
col("animal_type").isin("rat", "cat", "dog").as("is_mammal")
)
)
var fieldStructs = new ArrayBuffer[(String, String, Boolean)]()
def flattenSchema(schema: StructType, fieldStructs: ArrayBuffer[(String, String, Boolean)], prefix: String = null): ArrayBuffer[(String, String, Boolean)] = {
schema.fields.foreach(field => {
val col = if (prefix == null) field.name else (prefix + "." + field.name)
field.dataType match {
case st: StructType => {
fieldStructs += ((col, field.dataType.typeName, field.nullable))
flattenSchema(st, fieldStructs, col)
}
case _ => {
fieldStructs += ((col, field.dataType.simpleString, field.nullable))
}
}}
)
fieldStructs
}
val foo = flattenSchema(actualDF.schema, fieldStructs).toSeq.toDF("field", "type", "nullable")
foo.show(false)如果您运行上述操作,您应该得到以下内容。
+-------------------------------------+-------+--------+
|field |type |nullable|
+-------------------------------------+-------+--------+
|weight |double |true |
|animal_type |string |true |
|animal_interpretation |struct |false |
|animal_interpretation.is_large_animal|boolean|true |
|animal_interpretation.is_mammal |boolean|true |
+-------------------------------------+-------+--------+发布于 2019-07-16 20:12:46
你可以做这样的事
def flattenSchema(schema: StructType, prefix: String = null) : Seq[(String, String, Boolean)] = {
schema.fields.flatMap(field => {
val col = if (prefix == null) field.name else (prefix + "." + field.name)
field.dataType match {
case st: StructType => flattenSchema(st, col)
case _ => Array((col, field.dataType.simpleString, field.nullable))
}
})
}
flattenSchema(actualDF.schema).toDF("field", "type", "nullable").show()希望这能有所帮助!
https://stackoverflow.com/questions/57063843
复制相似问题