我有一个scala函数,它接受一个spark dataframe并返回一个单值,也就是说两个值。这个函数很复杂,使用在DataFrame类中定义的聚合,调用其他java库,并且不能用SQL表达。它需要整个数据帧的内容来进行计算,它不能一次添加一行并建立一个结果。
我有一个大的dataframe,其中包含一个列,我想用它将dataframe分成小块,并对每个小块执行上述计算。然后,我想返回一个新的数据帧,其中包含每个组的一行和两列,一列包含groupby值,另一列包含结果。
使用PandasUDF这将是一个相对简单的任务,但是我不知道如何在Scala中做到这一点。
我尝试使用group by列对数据帧进行重新分区,然后调用mapPartitions,但是传递给mapPartitions的函数必须具有签名IteratorRow -> IteratorX。我可以很容易地使用IteratorRow并创建SeqRow或ListRow,但是似乎不可能从这个序列创建数据帧,因为计算是在工作节点上完成的,并且只能从驱动程序创建数据帧。由于使用了DataFrame的一些高级聚合函数(例如approxQuantile),因此需要进行大量的重新设计才能将原始函数重写为SeqRow。
问题的症结似乎在于,与Pandas相比,没有“本地(仅限/worker/非分布式)数据帧”的概念,而Pandas显然将数据帧限制为本地数据帧。
我是不是漏掉了什么明显的东西?
发布于 2020-04-27 00:32:14
我有一个大的dataframe,其中包含一个列,我想用它将dataframe分成小块,并对每个小块执行上述计算。
该列中的值是预先知道的吗?如果不是,它们至少是可收藏的吗?假设您可以收集它们,如下所示:
val chunkValues: Array[Any] = df.select("chunk")
.collect()
.map(r => r.getAs[Any](0))循环遍历这些值以多次过滤inputDF,并执行繁重的逻辑:
val chunkDFs: Array[DataFrame] = chunkValues.map(value => {
val chunkBeforeDF = inputDF.filter(col("chunk") === value)
val chunkAfterDF = yourLogic(chunkBefore)
})让他们再次联合起来。
https://stackoverflow.com/questions/61354823
复制相似问题