我的问题是,什么时候应该做dataframe.cache(),什么时候有用?
另外,在我的代码中,我应该在注释行中缓存数据帧吗?
注意:我的数据帧是从Redshift DB加载的。
非常感谢
下面是我的代码:
def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):
df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta'])
df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa'])
dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \
.filter(dataframe.seq_reserva.isin(seq_reservas))
##################################################
#SHOULD I CACHE HERE df_vta, df_cpa and dataframe
##################################################
dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,
dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,
dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,
dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,
dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,
]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta",
"cod_esquema_vta", "cod_emp_atlas_vta") \
.join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,
dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,
dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,
dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,
dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,
]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa",
"cod_esquema_cpa", "cod_emp_atlas_cpa") \
.select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con",
"imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")
######################################
#SHOULD I CACHE HERE dataframe AGAIN ?
######################################
dataframe = dataframe.withColumn("amount1",
func.when(dataframe.ind_tipo_regimen_fac == 'E',
dataframe.imp_margen_canal * (
1 - (1 / (1 + (dataframe.pct_impuesto_vta
/ 100)))))
.otherwise(dataframe.imp_venta * (
1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
dataframe.imp_venta - dataframe.imp_margen_canal) * (
1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))
dataframe = dataframe.withColumn("amount2",
func.when(dataframe.ind_tipo_regimen_con == 'E',
dataframe.imp_margen_canco * (
1 - (1 / (1 + (dataframe.pct_impuesto_vta
/ 100)))))
.otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (
1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (
dataframe.imp_coste) * (
1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))
dataframe = dataframe.na.fill({'amount1': 0})
dataframe = dataframe.na.fill({'amount2': 0})
dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming,
dataframe.seq_reserva == df_aux.booking_id])
dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,
func.lit(EUR),
dataframe.creation_date,
dataframe.amount1))
dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,
func.lit(EUR),
dataframe.creation_date,
dataframe.amount2))
dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)
dataframe = dataframe.na.fill({'impuesto_canco': 0})
dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco")
######################################
#SHOULD I CACHE HERE dataframe AGAIN ?
######################################
dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \
withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")
return dataframe发布于 2017-05-24 19:18:15
我应该什么时候做dataframe.cache(),什么时候有用?
对要跨查询使用的内容进行cache (早期且经常达到可用内存)。使用哪种编程语言(Python或Scala或Java、SQL或R)实际上并不重要,因为底层机制是相同的。
您可以使用explain运算符查看物理计划中是否缓存了DataFrame (其中InMemoryRelation实体反映缓存的数据集及其存储级别):
== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 1, step=1, splits=Some(8))在cache (或persist)您的DataFrame之后,第一个查询可能会变得更慢,但它将为以下查询带来回报。
您可以使用以下代码检查数据集是否已缓存:
scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false另外,在我的代码中,我应该在注释行中缓存数据帧吗?
是也不是。缓存表示外部数据集的内容,这样您就不会在每次查询数据时通过网络传输数据(同时访问外部存储)而付出额外的代价。
不要缓存只使用一次或易于计算的内容。否则,返回cache。
注意缓存的内容,即缓存的Dataset,因为它提供了不同的缓存查询。
// cache after range(5)
val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id")
scala> q1.explain
== Physical Plan ==
*Filter ((id#0L % 2) = 0)
+- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 5, step=1, splits=8)
// cache at the end
val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#17L]
+- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Filter ((id#17L % 2) = 0)
+- *Range (0, 1, step=1, splits=8)Spark SQL中的缓存有一个令人惊讶的地方。缓存是惰性的,这就是为什么您要在第一个操作时就为缓存行付出额外的代价,但这只会发生在DataFrame应用编程接口中。在SQL中,缓存是急切的,这在查询性能上有很大的不同,因为您不需要调用操作来触发缓存。
发布于 2018-01-04 23:43:33
实际上,在您的情况下,.cache()根本帮不上忙。您没有对您的(至少不是在您提供的函数中) dataframe执行任何操作。如果您要多次使用数据,则.cache()是一个好主意:
data = sub_tax_transfer_pricing_eur_aux(...).cache()
one_use_case = data.groupBy(...).agg(...).show()
another_use_case = data.groupBy(...).agg(...).show()这样,您将只获取一次数据(当第一个操作被称为.show(),然后下一次使用data dataframe时应该会更快。但是,使用时要小心-有时再次获取数据会更快。此外,我建议不要一次又一次地给数据帧命名相同的名称。毕竟,Dataframe是不可变的对象。
希望这能对你有所帮助。
发布于 2019-05-08 19:57:22
在Spark中缓存RDD :这是一种加速多次访问同一RDD的应用程序的机制。未缓存或未设置检查点的RDD在每次调用该RDD上的操作时都会再次进行评估。缓存RDD有两个函数调用:cache()和persist(level: StorageLevel)。它们之间的区别是,cache()会将RDD缓存到内存中,而persist(level)可以根据level指定的缓存策略在内存、磁盘或堆外内存中缓存。不带参数的persist()等同于cache()。我们将在这篇文章的后面讨论缓存策略。从存储内存中释放空间由unpersist()执行。
何时使用缓存:正如本文所建议的,建议在以下情况下使用缓存:
在迭代机器学习中重新使用Spark applications
https://stackoverflow.com/questions/44156365
复制相似问题