我正在运行一个spark结构的流作业,其中包括创建一个空数据帧,使用每个微批处理更新它,如下所示。每次微批处理执行时,阶段数增加4。为了避免重新计算,我在循环内的每个更新之后将更新后的StaticDF持久化到内存中。这有助于跳过每个新的微批次创建的那些额外的阶段。
我的问题是-
1)即使总的完成阶段保持不变,增加的阶段总是被跳过,但这是否会导致性能问题,因为在某个时间点上可能会有数百万个跳过的阶段?
2)当某些缓存的RDD部分或全部不可用时会发生什么?(节点/执行器故障)。Spark文档说,到目前为止,它还没有实现从多个微批次接收的所有数据,所以这是否意味着它将需要再次从Kafka读取所有事件来重新生成staticDF?
// one time creation of empty static(not streaming) dataframe
val staticDF_schema = new StructType()
.add("product_id", LongType)
.add("created_at", LongType)
var staticDF = sparkSession
.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], staticDF_schema)
// Note : streamingDF was created from Kafka source
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.foreachBatch {
(micro_batch_DF: DataFrame) => {
// fetching max created_at for each product_id in current micro-batch
val staging_df = micro_batch_DF.groupBy("product_id")
.agg(max("created").alias("created"))
// Updating staticDF using current micro batch
staticDF = staticDF.unionByName(staging_df)
staticDF = staticDF
.withColumn("rnk",
row_number().over(Window.partitionBy("product_id").orderBy(desc("created_at")))
).filter("rnk = 1")
.drop("rnk")
.cache()
}

发布于 2020-04-23 15:51:11
即使跳过的阶段不需要任何计算,但我的作业在一定数量的批处理后开始失败。这是因为DAG在每次批处理执行时都会增长,使其无法管理,并抛出堆栈溢出异常。
为了避免这种情况,我不得不打破火花谱系,这样阶段的数量就不会随着每次运行而增加(即使它们被跳过了)。
https://stackoverflow.com/questions/61206084
复制相似问题