首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >跳过的阶段对Spark作业有性能影响吗?

跳过的阶段对Spark作业有性能影响吗?
EN

Stack Overflow用户
提问于 2020-04-14 18:57:18
回答 1查看 392关注 0票数 0

我正在运行一个spark结构的流作业,其中包括创建一个空数据帧,使用每个微批处理更新它,如下所示。每次微批处理执行时,阶段数增加4。为了避免重新计算,我在循环内的每个更新之后将更新后的StaticDF持久化到内存中。这有助于跳过每个新的微批次创建的那些额外的阶段。

我的问题是-

1)即使总的完成阶段保持不变,增加的阶段总是被跳过,但这是否会导致性能问题,因为在某个时间点上可能会有数百万个跳过的阶段?

2)当某些缓存的RDD部分或全部不可用时会发生什么?(节点/执行器故障)。Spark文档说,到目前为止,它还没有实现从多个微批次接收的所有数据,所以这是否意味着它将需要再次从Kafka读取所有事件来重新生成staticDF?

代码语言:javascript
复制
// one time creation of empty static(not streaming) dataframe
val staticDF_schema = new StructType()
      .add("product_id", LongType)
      .add("created_at", LongType)
var staticDF = sparkSession
.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], staticDF_schema)

// Note : streamingDF was created from Kafka source
    streamingDF.writeStream
      .trigger(Trigger.ProcessingTime(10000L))
      .foreachBatch {
        (micro_batch_DF: DataFrame) => {

        // fetching max created_at for each product_id in current micro-batch
          val staging_df = micro_batch_DF.groupBy("product_id")
            .agg(max("created").alias("created"))

          // Updating staticDF using current micro batch
          staticDF = staticDF.unionByName(staging_df)
          staticDF = staticDF
            .withColumn("rnk",
              row_number().over(Window.partitionBy("product_id").orderBy(desc("created_at")))
            ).filter("rnk = 1")
            .drop("rnk")
              .cache()

          }

EN

回答 1

Stack Overflow用户

发布于 2020-04-23 15:51:11

即使跳过的阶段不需要任何计算,但我的作业在一定数量的批处理后开始失败。这是因为DAG在每次批处理执行时都会增长,使其无法管理,并抛出堆栈溢出异常。

为了避免这种情况,我不得不打破火花谱系,这样阶段的数量就不会随着每次运行而增加(即使它们被跳过了)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61206084

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档