首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将结构传递给spark中的UDAF

将结构传递给spark中的UDAF
EN

Stack Overflow用户
提问于 2019-02-04 22:17:40
回答 1查看 221关注 0票数 1

我有以下模式-

代码语言:javascript
复制
root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)
 |-- name: string (nullable = true)

如何将'cars‘结构传递给udaf?如果我只想传递cars子结构,那么inputSchema应该是什么。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-05 05:14:12

你可以,但UDAF的逻辑将是不同的。例如,如果您有两行:

代码语言:javascript
复制
val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))

val rdd = spark.sparkContext.parallelize(seq)

这里的模式是

代码语言:javascript
复制
root
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)

然后,如果您尝试调用聚合:

代码语言:javascript
复制
val df = seq.toDF
df.agg(agg0(col("cars")))

您必须更改UDAF输入模式,如下所示:

代码语言:javascript
复制
val carsSchema =
    StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))

在你的UDAF的男孩中,你必须处理这个模式,改变inputSchema:

代码语言:javascript
复制
override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)

在update方法中,必须处理输入行的格式:

代码语言:javascript
复制
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  val i = input.getAs[Array[Array[String]]](0)
  // i here would be [car1,car2,car3],  an array of strings
  buffer(0) = ???
}

从这里开始,您可以转换i以更新缓冲区,并完成合并和求值功能。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54518102

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档