首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >何时缓存DataFrame?

何时缓存DataFrame?
EN

Stack Overflow用户
提问于 2017-05-24 18:57:11
回答 3查看 51.9K关注 0票数 32

我的问题是,什么时候应该做dataframe.cache(),什么时候有用?

另外,在我的代码中,我应该在注释行中缓存数据帧吗?

注意:我的数据帧是从Redshift DB加载的。

非常感谢

下面是我的代码:

代码语言:javascript
复制
def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):
    df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta'])
    df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa'])

    dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \
        .filter(dataframe.seq_reserva.isin(seq_reservas))

    ##################################################
    #SHOULD I CACHE HERE df_vta, df_cpa and dataframe
    ##################################################

    dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,
                                        dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,
                                        dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,
                                        dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,
                                        dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,
                                        ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta",
                                                "cod_esquema_vta", "cod_emp_atlas_vta") \
        .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,
                       dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,
                       dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,
                       dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,
                       dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,
                       ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa",
                               "cod_esquema_cpa", "cod_emp_atlas_cpa") \
        .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con",
                "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")

    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################

    dataframe = dataframe.withColumn("amount1",
                                     func.when(dataframe.ind_tipo_regimen_fac == 'E',
                                               dataframe.imp_margen_canal * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise(dataframe.imp_venta * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_venta - dataframe.imp_margen_canal) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.withColumn("amount2",
                                     func.when(dataframe.ind_tipo_regimen_con == 'E',
                                               dataframe.imp_margen_canco * (
                                                   1 - (1 / (1 + (dataframe.pct_impuesto_vta
                                                                  / 100)))))
                                     .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (
                                         1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
                                                    dataframe.imp_coste) * (
                                                    1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))

    dataframe = dataframe.na.fill({'amount1': 0})
    dataframe = dataframe.na.fill({'amount2': 0})

    dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming,
                                        dataframe.seq_reserva == df_aux.booking_id])

    dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount1))

    dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,
                                                                             func.lit(EUR),
                                                                             dataframe.creation_date,
                                                                             dataframe.amount2))

    dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)

    dataframe = dataframe.na.fill({'impuesto_canco': 0})

    dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco")
    ######################################         
    #SHOULD I CACHE HERE dataframe AGAIN ?
    ######################################
    dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \
        withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")

    return dataframe
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-05-24 19:18:15

我应该什么时候做dataframe.cache(),什么时候有用?

对要跨查询使用的内容进行cache (早期且经常达到可用内存)。使用哪种编程语言(Python或Scala或Java、SQL或R)实际上并不重要,因为底层机制是相同的。

您可以使用explain运算符查看物理计划中是否缓存了DataFrame (其中InMemoryRelation实体反映缓存的数据集及其存储级别):

代码语言:javascript
复制
== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

cache (或persist)您的DataFrame之后,第一个查询可能会变得更慢,但它将为以下查询带来回报。

您可以使用以下代码检查数据集是否已缓存:

代码语言:javascript
复制
scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false

另外,在我的代码中,我应该在注释行中缓存数据帧吗?

是也不是。缓存表示外部数据集的内容,这样您就不会在每次查询数据时通过网络传输数据(同时访问外部存储)而付出额外的代价。

不要缓存只使用一次或易于计算的内容。否则,返回cache

注意缓存的内容,即缓存的Dataset,因为它提供了不同的缓存查询。

代码语言:javascript
复制
// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 5, step=1, splits=8)

// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
   +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Filter ((id#17L % 2) = 0)
            +- *Range (0, 1, step=1, splits=8)

Spark SQL中的缓存有一个令人惊讶的地方。缓存是惰性的,这就是为什么您要在第一个操作时就为缓存行付出额外的代价,但这只会发生在DataFrame应用编程接口中。在SQL中,缓存是急切的,这在查询性能上有很大的不同,因为您不需要调用操作来触发缓存。

票数 38
EN

Stack Overflow用户

发布于 2018-01-04 23:43:33

实际上,在您的情况下,.cache()根本帮不上忙。您没有对您的(至少不是在您提供的函数中) dataframe执行任何操作。如果您要多次使用数据,则.cache()是一个好主意:

代码语言:javascript
复制
data = sub_tax_transfer_pricing_eur_aux(...).cache()
one_use_case = data.groupBy(...).agg(...).show()
another_use_case = data.groupBy(...).agg(...).show()

这样,您将只获取一次数据(当第一个操作被称为.show(),然后下一次使用data dataframe时应该会更快。但是,使用时要小心-有时再次获取数据会更快。此外,我建议不要一次又一次地给数据帧命名相同的名称。毕竟,Dataframe是不可变的对象。

希望这能对你有所帮助。

票数 19
EN

Stack Overflow用户

发布于 2019-05-08 19:57:22

在Spark中缓存RDD :这是一种加速多次访问同一RDD的应用程序的机制。未缓存或未设置检查点的RDD在每次调用该RDD上的操作时都会再次进行评估。缓存RDD有两个函数调用:cache()persist(level: StorageLevel)。它们之间的区别是,cache()会将RDD缓存到内存中,而persist(level)可以根据level指定的缓存策略在内存、磁盘或堆外内存中缓存。不带参数的persist()等同于cache()。我们将在这篇文章的后面讨论缓存策略。从存储内存中释放空间由unpersist()执行。

何时使用缓存:正如本文所建议的,建议在以下情况下使用缓存:

在迭代机器学习中重新使用Spark applications

  • RDD RDD重复使用在独立的Spark applications

  • When RDD计算中重新使用
  • RDD是昂贵的,缓存可以帮助在一个执行器失败的情况下降低恢复成本
票数 12
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44156365

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档