如何在不使用percentile 15th和避免爆炸的情况下,考虑到occ列,计算列学生的percentile 50th和array_repeat?我有巨大的输入数据和爆炸摧毁了记忆。
我的DF是:
name | occ | students
aaa 1 1
aaa 3 7
aaa 6 11
...例如,如果我认为学生和occ是bot数组,那么如果考虑到occ,那么要计算数组学生的百分数50,我会像这样规范计算:
val students = Array(1,7,11)
val occ = Array(1,3,6)
it gives:
val student_repeated = Array(1,7,7,7,11,11,11,11,11,11)那么student_50th将是student_repeated => 11的第50%。
我现在的代码是:
import spark.implicits._
val inputDF = Seq(
("aaa", 1, 1),
("aaa", 3, 7),
("aaa", 6, 11),
)
.toDF("name", "occ", "student")
// Solution 1
inputDF
.withColumn("student", array_repeat(col("student"), col("occ")))
.withColumn("student", explode(col("student")))
.groupBy("name")
.agg(
percentile_approx(col("student"), lit(0.5), lit(10000)).alias("student_50"),
percentile_approx(col("student"), lit(0.15), lit(10000)).alias("student_15"),
)
.show(false)其中产出:
+----+----------+----------+
|name|student_50|student_15|
+----+----------+----------+
|aaa |11 |7 |
+----+----------+----------+编辑:我正在寻找与scala等价的解决方案:https://stackoverflow.com/a/58309977/4450090
EDIT2:我正在进行草图-java https://github.com/DataDog/sketches-java
发布于 2022-05-25 07:12:50
我已经决定使用dds草图,其中有方法接受,允许草图被更新。
"com.datadoghq" % "sketches-java" % "0.8.2"首先,初始化空草图。然后,我接受一对值(值、重量),然后我叫dds示意图方法getValueAtQuantile。
我确实以Spark聚合器的身份执行了所有操作。
class DDSInitAgg(pct: Double, accuracy: Double) extends Aggregator[ValueWithWeigth, SketchData, Double]{
private val precision: String = "%.6f"
override def zero: SketchData = DDSUtils.sketchToTuple(DDSketches.unboundedDense(accuracy))
override def reduce(b: SketchData, a: ValueWithWeigth): SketchData = {
val s = DDSUtils.sketchFromTuple(b)
s.accept(a.value, a.weight)
DDSUtils.sketchToTuple(s)
}
override def merge(b1: SketchData, b2: SketchData): SketchData = {
val s1: DDSketch = DDSUtils.sketchFromTuple(b1)
val s2: DDSketch = DDSUtils.sketchFromTuple(b2)
s1.mergeWith(s2)
DDSUtils.sketchToTuple(s1)
}
override def finish(reduction: SketchData): Double = {
val percentile: Double = DDSUtils.sketchFromTuple(reduction).getValueAtQuantile(pct)
precision.format(percentile).toDouble
}
override def bufferEncoder: Encoder[SketchData] = ExpressionEncoder()
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}您可以以两列作为输入,作为联非新议程执行它。此外,我还开发了从DDSSketch <-> ArrayByte中来回编码/解码的方法
case class SketchData(backingArray: Array[Byte], numWrittenBytes: Int)
object DDSUtils {
val emptySketch: DDSketch = DDSketches.unboundedDense(0.01)
val supplierStore: Supplier[Store] = () => new UnboundedSizeDenseStore()
def sketchToTuple(s: DDSketch): SketchData = {
val o = GrowingByteArrayOutput.withDefaultInitialCapacity()
s.encode(o, false)
SketchData(o.backingArray(), o.numWrittenBytes())
}
def sketchFromTuple(sketchData: SketchData): DDSketch = {
val i: ByteArrayInput = ByteArrayInput.wrap(sketchData.backingArray, 0, sketchData.numWrittenBytes)
DDSketch.decode(i, supplierStore)
}
}这就是我所说的联非新议程
val ddsInitAgg50UDAF: UserDefinedFunction = udaf(new DDSInitAgg(0.50, 0.50), ExpressionEncoder[ValueWithWeigth])最后,在聚合中:
ddsInitAgg50UDAF(col("weigthCol"), col("valueCol")).alias("value_pct_50")https://stackoverflow.com/questions/72212758
复制相似问题