首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Spark streaming中避免批量大小的突然峰值?

如何在Spark streaming中避免批量大小的突然峰值?
EN

Stack Overflow用户
提问于 2017-11-17 22:42:06
回答 1查看 787关注 0票数 0

我正在从kafka流式传输数据,并试图将每批事件的数量限制在10个事件。在处理10-15批次后,批次大小突然出现峰值。下面是我的设置:

代码语言:javascript
复制
spark.streaming.kafka.maxRatePerPartition=1

spark.streaming.backpressure.enabled=true

spark.streaming.backpressure.pid.minRate=1

spark.streaming.receiver.maxRate=2

Please check this image for the streaming behavior

EN

回答 1

Stack Overflow用户

发布于 2018-01-25 23:53:41

这是spark中的bug,请参考:https://issues.apache.org/jira/browse/SPARK-18371

pull请求尚未合并,但您可以选择它并自行构建spark。

总结一下这个问题:

如果您将spark.streaming.backpressure.pid.minRate设置为number <= partition count,则计算的有效速率为0:

代码语言:javascript
复制
val totalLag = lagPerPartition.values.sum
...
    val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
...

(第二行计算每个分区的速率,其中rate是从PID开始的速率,默认为minRate,当PID计算它时,它应该更小),如下所示:DirectKafkaInputDStream code

结果为0会导致回退到(不合理的)分区头部:

代码语言:javascript
复制
    ...
    if (effectiveRateLimitPerPartition.values.sum > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
      Some(effectiveRateLimitPerPartition.map {
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
      })
    } else {
      None
    }

    ...

maxMessagesPerPartition(offsets).map { mmp =>
  mmp.map { case (tp, messages) =>
    val lo = leaderOffsets(tp)
    tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
  }
}.getOrElse(leaderOffsets)

DirectKafkaInputDStream#clamp中的一样

这使得当你的实际和最小接收速率/msg/分区小于~等于分区数,并且你经历了显着的滞后时,背压基本上不起作用(例如,消息以峰值形式出现,并且你有恒定的处理能力)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47353207

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档