首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星星之火递归函数,因为udf生成异常。

星星之火递归函数,因为udf生成异常。
EN

Stack Overflow用户
提问于 2017-09-27 09:01:14
回答 1查看 2K关注 0票数 0

我正在使用DataFrames,哪些元素具有类似于以下模式的模式:

代码语言:javascript
复制
root
 |-- NPAData: struct (nullable = true)
 |    |-- NPADetails: struct (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- manager: string (nullable = true)
 |    |-- service: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- serviceName: string (nullable = true)
 |    |    |    |-- serviceCode: string (nullable = true) 
 |-- NPAHeader: struct (nullable = true)
 |    |    |-- npaNumber: string (nullable = true)
 |    |    |-- date: string (nullable = true)

在我的DataFrame中,我希望对具有相同NPAHeader.code的所有元素进行分组,为此,我将使用以下行:

代码语言:javascript
复制
val groupedNpa = orderedNpa.groupBy($"NPAHeader.code" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))

在此之后,我将使用以下模式进行数据访问:

代码语言:javascript
复制
StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...)))))

每一行的一个例子将类似于:

代码语言:javascript
复制
[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]

现在,我想要的是生成另一个DataFrame,它只拾取WrappedArray中的一个元素,所以我想要一个类似于:

代码语言:javascript
复制
[1234,npaNew]

注意: WrappedArray中选择的元素是在遍历整个WrappedArray之后匹配complext逻辑的元素。但是为了简化这个问题,我将始终选择WrappedArray的最后一个元素(在对进行迭代之后的)。

要做到这一点,我想要定义一个递归的udf。

代码语言:javascript
复制
import org.apache.spark.sql.functions.udf

def returnRow(elementList : Row)(index:Int): Row = {
  val dif = elementList.size - index
  val row :Row = dif match{
    case 0 => elementList.getAs[Row](index)
    case _ => returnRow(elementList)(index + 1) 
  }
  row
} 

val returnRow_udf = udf(returnRow _)


groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))}

但我在地图上得到了以下错误:

线程“主”java.lang.UnsupportedOperationException中的异常:不支持Int => Unit类型的模式

我做错了什么?

顺便提一下,我不确定我是否正确地传递了带有npagroupedNpa("npa")列。我以行的形式加入WrappedArray,因为我不知道如何在Array[Row]上迭代( get(index)方法不存在于ArrayRow中)

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-27 10:23:01

TL;博士只是使用了如何选择每组的第一行?中描述的一种方法

如果要使用复杂逻辑并返回Row,可以跳过SQL API并使用groupByKey

代码语言:javascript
复制
val f: (String, Iterator[org.apache.spark.sql.Row]) => Row
val encoder: Encoder 
df.groupByKey(_.getAs[String]("NPAHeader.code")).mapGroups(f)(encoder)

或更好:

代码语言:javascript
复制
val g: (Row, Row) => Row

df.groupByKey(_.getAs[String]("NPAHeader.code")).reduceGroups(g)

其中encoder是一个有效的RowEncoder (当试图将dataframe行映射到更新的行时出现编码器错误)。

您的代码在多个方面都有错误:

  • groupBy不保证值的顺序。所以: orderBy(...).groupBy(....).agg(collect_list(...)) 可以有不确定的输出。如果您真的决定走这个路线,您应该跳过orderBy并显式地对收集的数组进行排序。
  • 不能将咖喱函数传递给udf。您必须先取消它,但是它需要不同的参数顺序(参见下面的示例)。
  • 如果可以的话,这可能是调用它的正确方法(请注意,省略了第二个参数): returnRow_udf(groupedNpa("npa")(0)) 更糟糕的是,您可以在map中调用它,在这里,udfs根本不适用。
  • udf不能返回Row。它必须返回外部Scala型
  • array<struct>的外部表示形式是Seq[Row]。您不能仅仅用Row来代替它。
  • 可以使用apply通过索引访问SQL数组: Df.select($“array”(大小($“array”)- 1)) 但由于非决定论,这并不是一种正确的方法。您可以应用sort_array,但正如在开始时指出的,有更有效的解决方案。
  • 令人惊讶的是,递归与此无关。您可以这样设计函数: def size(i: Int=0)( xs : SeqAny):Int =xs match { case () => I大小写为null => I case (h,t@ _*) => size(i + 1)(t) } val size_ = udf(size() _) 它会运作得很好: (1,Seq("a","b",“c”)).toDF(“id”,"array") .select(size_($"array")) 尽管递归是一个过头,但如果您只需在Seq上迭代。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46443668

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档