
Kafka direct consumer开始限制每批(5秒)读取450个事件(5 * 90个分区),在此之前运行了1到2天(每批约5000到40000个事件)
我使用在亚马逊网络服务中运行的spark独立集群(spark和spark-streaming-kafka版本1.6.1),并使用检查点目录StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext)的S3存储桶,没有调度延迟,每个工作节点上都有足够的磁盘空间。
没有改变任何Kafka客户端的初始化参数,非常确定kafka的结构没有改变:
val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)我也不明白为什么当直接消费者描述说The consumed offsets are by the stream itself时,我在创建流上下文时仍然需要使用检查点目录?
发布于 2016-12-19 20:51:26
这通常是通过将spark.streaming.backpressure.enabled设置为true来启用背压的结果。通常,当背压算法发现有比过去更多的数据进入时,它会开始将每批数据限制在一个相当小的大小,直到它可以再次“稳定”自己。这有时会有误报,并导致您的流减慢处理速度。
如果你想稍微调整一下启发式,它使用了一些没有文档记录的标志(只需确保你知道你在做什么):
val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)如果你想要更多的细节,PIDRateEstimator就是你要找的。
https://stackoverflow.com/questions/41222247
复制相似问题