我有以下模式-
root
|-- id:string (nullable = false)
|-- age: long (nullable = true)
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)
|-- name: string (nullable = true)如何将'cars‘结构传递给udaf?如果我只想传递cars子结构,那么inputSchema应该是什么。
发布于 2019-02-05 05:14:12
你可以,但UDAF的逻辑将是不同的。例如,如果您有两行:
val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))
val rdd = spark.sparkContext.parallelize(seq)这里的模式是
root
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)然后,如果您尝试调用聚合:
val df = seq.toDF
df.agg(agg0(col("cars")))您必须更改UDAF输入模式,如下所示:
val carsSchema =
StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))在你的UDAF的男孩中,你必须处理这个模式,改变inputSchema:
override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)在update方法中,必须处理输入行的格式:
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val i = input.getAs[Array[Array[String]]](0)
// i here would be [car1,car2,car3], an array of strings
buffer(0) = ???
}从这里开始,您可以转换i以更新缓冲区,并完成合并和求值功能。
https://stackoverflow.com/questions/54518102
复制相似问题