我正在使用DataFrames,哪些元素具有类似于以下模式的模式:
root
|-- NPAData: struct (nullable = true)
| |-- NPADetails: struct (nullable = true)
| | |-- location: string (nullable = true)
| | |-- manager: string (nullable = true)
| |-- service: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- serviceName: string (nullable = true)
| | | |-- serviceCode: string (nullable = true)
|-- NPAHeader: struct (nullable = true)
| | |-- npaNumber: string (nullable = true)
| | |-- date: string (nullable = true)在我的DataFrame中,我希望对具有相同NPAHeader.code的所有元素进行分组,为此,我将使用以下行:
val groupedNpa = orderedNpa.groupBy($"NPAHeader.code" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))在此之后,我将使用以下模式进行数据访问:
StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...)))))每一行的一个例子将类似于:
[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]现在,我想要的是生成另一个DataFrame,它只拾取WrappedArray中的一个元素,所以我想要一个类似于:
[1234,npaNew]注意: WrappedArray中选择的元素是在遍历整个WrappedArray之后匹配complext逻辑的元素。但是为了简化这个问题,我将始终选择WrappedArray的最后一个元素(在对进行迭代之后的)。
要做到这一点,我想要定义一个递归的udf。
import org.apache.spark.sql.functions.udf
def returnRow(elementList : Row)(index:Int): Row = {
val dif = elementList.size - index
val row :Row = dif match{
case 0 => elementList.getAs[Row](index)
case _ => returnRow(elementList)(index + 1)
}
row
}
val returnRow_udf = udf(returnRow _)
groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))}但我在地图上得到了以下错误:
线程“主”java.lang.UnsupportedOperationException中的异常:不支持Int => Unit类型的模式
我做错了什么?
顺便提一下,我不确定我是否正确地传递了带有npa的groupedNpa("npa")列。我以行的形式加入WrappedArray,因为我不知道如何在Array[Row]上迭代( get(index)方法不存在于ArrayRow中)
发布于 2017-09-27 10:23:01
TL;博士只是使用了如何选择每组的第一行?中描述的一种方法
如果要使用复杂逻辑并返回Row,可以跳过SQL API并使用groupByKey。
val f: (String, Iterator[org.apache.spark.sql.Row]) => Row
val encoder: Encoder
df.groupByKey(_.getAs[String]("NPAHeader.code")).mapGroups(f)(encoder)或更好:
val g: (Row, Row) => Row
df.groupByKey(_.getAs[String]("NPAHeader.code")).reduceGroups(g)其中encoder是一个有效的RowEncoder (当试图将dataframe行映射到更新的行时出现编码器错误)。
您的代码在多个方面都有错误:
groupBy不保证值的顺序。所以:
orderBy(...).groupBy(....).agg(collect_list(...))
可以有不确定的输出。如果您真的决定走这个路线,您应该跳过orderBy并显式地对收集的数组进行排序。udf。您必须先取消它,但是它需要不同的参数顺序(参见下面的示例)。map中调用它,在这里,udfs根本不适用。udf不能返回Row。它必须返回外部Scala型。array<struct>的外部表示形式是Seq[Row]。您不能仅仅用Row来代替它。apply通过索引访问SQL数组:
Df.select($“array”(大小($“array”)- 1))
但由于非决定论,这并不是一种正确的方法。您可以应用sort_array,但正如在开始时指出的,有更有效的解决方案。Seq上迭代。https://stackoverflow.com/questions/46443668
复制相似问题