首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >加权星火scala percentile_approx

加权星火scala percentile_approx
EN

Stack Overflow用户
提问于 2022-05-12 09:00:58
回答 1查看 65关注 0票数 0

如何在不使用percentile 15th和避免爆炸的情况下,考虑到occ列,计算列学生的percentile 50tharray_repeat?我有巨大的输入数据和爆炸摧毁了记忆。

我的DF是:

代码语言:javascript
复制
name | occ | students 
aaa     1         1
aaa     3         7
aaa     6         11
...

例如,如果我认为学生和occ是bot数组,那么如果考虑到occ,那么要计算数组学生的百分数50,我会像这样规范计算:

代码语言:javascript
复制
val students = Array(1,7,11)
val occ = Array(1,3,6)

it gives:
val student_repeated = Array(1,7,7,7,11,11,11,11,11,11)

那么student_50th将是student_repeated => 11的第50%。

我现在的代码是:

代码语言:javascript
复制
import spark.implicits._

val inputDF = Seq(
  ("aaa", 1, 1),
  ("aaa", 3, 7),
  ("aaa", 6, 11),
)
  .toDF("name", "occ", "student")

// Solution 1
inputDF
  .withColumn("student", array_repeat(col("student"), col("occ")))
  .withColumn("student", explode(col("student")))
  .groupBy("name")
  .agg(
    percentile_approx(col("student"), lit(0.5), lit(10000)).alias("student_50"),
    percentile_approx(col("student"), lit(0.15), lit(10000)).alias("student_15"),

  )
  .show(false)

其中产出:

代码语言:javascript
复制
+----+----------+----------+
|name|student_50|student_15|
+----+----------+----------+
|aaa |11        |7         |
+----+----------+----------+

编辑:我正在寻找与scala等价的解决方案:https://stackoverflow.com/a/58309977/4450090

EDIT2:我正在进行草图-java https://github.com/DataDog/sketches-java

EN

回答 1

Stack Overflow用户

发布于 2022-05-25 07:12:50

我已经决定使用dds草图,其中有方法接受,允许草图被更新。

代码语言:javascript
复制
"com.datadoghq" % "sketches-java" % "0.8.2"

首先,初始化空草图。然后,我接受一对值(值、重量),然后我叫dds示意图方法getValueAtQuantile。

我确实以Spark聚合器的身份执行了所有操作。

代码语言:javascript
复制
class DDSInitAgg(pct: Double, accuracy: Double) extends Aggregator[ValueWithWeigth, SketchData, Double]{
  private val precision: String = "%.6f"

  override def zero: SketchData = DDSUtils.sketchToTuple(DDSketches.unboundedDense(accuracy))

  override def reduce(b: SketchData, a: ValueWithWeigth): SketchData = {
    val s = DDSUtils.sketchFromTuple(b)
    s.accept(a.value, a.weight)
    DDSUtils.sketchToTuple(s)
  }

  override def merge(b1: SketchData, b2: SketchData): SketchData = {
    val s1: DDSketch = DDSUtils.sketchFromTuple(b1)
    val s2: DDSketch = DDSUtils.sketchFromTuple(b2)
    s1.mergeWith(s2)
    DDSUtils.sketchToTuple(s1)
  }

  override def finish(reduction: SketchData): Double = {
    val percentile: Double = DDSUtils.sketchFromTuple(reduction).getValueAtQuantile(pct)
    precision.format(percentile).toDouble
  }

  override def bufferEncoder: Encoder[SketchData] = ExpressionEncoder()
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

}

您可以以两列作为输入,作为联非新议程执行它。此外,我还开发了从DDSSketch <-> ArrayByte中来回编码/解码的方法

代码语言:javascript
复制
case class SketchData(backingArray: Array[Byte], numWrittenBytes: Int)

object DDSUtils {
  val emptySketch: DDSketch = DDSketches.unboundedDense(0.01)

  val supplierStore: Supplier[Store] = () => new UnboundedSizeDenseStore()

  def sketchToTuple(s: DDSketch): SketchData = {
    val o = GrowingByteArrayOutput.withDefaultInitialCapacity()
    s.encode(o, false)
    SketchData(o.backingArray(), o.numWrittenBytes())
  }

  def sketchFromTuple(sketchData: SketchData): DDSketch = {
    val i: ByteArrayInput = ByteArrayInput.wrap(sketchData.backingArray, 0, sketchData.numWrittenBytes)
    DDSketch.decode(i, supplierStore)
  }

}

这就是我所说的联非新议程

代码语言:javascript
复制
  val ddsInitAgg50UDAF: UserDefinedFunction = udaf(new DDSInitAgg(0.50, 0.50), ExpressionEncoder[ValueWithWeigth])

最后,在聚合中:

代码语言:javascript
复制
ddsInitAgg50UDAF(col("weigthCol"), col("valueCol")).alias("value_pct_50")
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72212758

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档