我有火花流应用程序从Kafka读取消息使用火花直接流(非接收者)方法和处理每个分区的消息。
在我的Kafka分区中,有时我们得到处理2000条消息需要20秒的消息,而对于相同的no,有些消息需要7-9秒。信息的传递。
在波动的情况下,我们打开背压设置如下。
spark.batch.duration=10 seconds
spark.streaming.kafka.maxRatePerPartition=200
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=60
spark.streaming.kafka.maxRatePerPartition=200并使用以下参数指定了RateEstimator。我不懂PID的数学,但尝试了不同的组合,其中之一如下。
spark.streaming.backpressure.rateEstimator=pid
spark.streaming.backpressure.pid.minRate=1600
spark.streaming.backpressure.pid.integral=1
spark.streaming.backpressure.pid.proportional=25
spark.streaming.backpressure.pid.derived=1最初,spark读取RDD中的一个分区的2000条消息,但过了一段时间,它开始读取800条记录。我认为是minRate/2,然后它保持不变..在日志中,它总是以新的速率打印1600。
2017-01-20 14:55:14 TRACE PIDRateEstimator:67 - New rate = 1600.0考虑到我的情况,我几乎没有什么问题:
spark.streaming.backpressure.pid.minRate或完全否。要分批读取的信息?


发布于 2021-03-15 15:27:03
回答你的问题#1 - spark.streaming.backpressure.pid.minRate是每个分区每秒的消息数量。
关于您的消息消耗率的差异,这可能是由于pid配置错误。
在大多数情况下,pid.proportional、pid.integral和pid.derived的默认值足够好。因此,如果没有正确理解pid速率估计器背后的数学知识的人,建议采用默认的方法。我要调的唯一参数是spark.streaming.backpressure.pid.minRate
见:https://richardstartin.github.io/posts/tuning-spark-back-pressure-by-simulation https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang/
https://stackoverflow.com/questions/41771622
复制相似问题