我正在从kafka流式传输数据,并试图将每批事件的数量限制在10个事件。在处理10-15批次后,批次大小突然出现峰值。下面是我的设置:
spark.streaming.kafka.maxRatePerPartition=1
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.pid.minRate=1
spark.streaming.receiver.maxRate=2发布于 2018-01-25 23:53:41
这是spark中的bug,请参考:https://issues.apache.org/jira/browse/SPARK-18371
pull请求尚未合并,但您可以选择它并自行构建spark。
总结一下这个问题:
如果您将spark.streaming.backpressure.pid.minRate设置为number <= partition count,则计算的有效速率为0:
val totalLag = lagPerPartition.values.sum
...
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
...(第二行计算每个分区的速率,其中rate是从PID开始的速率,默认为minRate,当PID计算它时,它应该更小),如下所示:DirectKafkaInputDStream code
结果为0会导致回退到(不合理的)分区头部:
...
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
...
maxMessagesPerPartition(offsets).map { mmp =>
mmp.map { case (tp, messages) =>
val lo = leaderOffsets(tp)
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
}
}.getOrElse(leaderOffsets)与DirectKafkaInputDStream#clamp中的一样
这使得当你的实际和最小接收速率/msg/分区小于~等于分区数,并且你经历了显着的滞后时,背压基本上不起作用(例如,消息以峰值形式出现,并且你有恒定的处理能力)。
https://stackoverflow.com/questions/47353207
复制相似问题