无论是直接使用Spark还是使用Shell,我都不知道如何显式地检查Spark催化剂查询优化器的操作。
例如,假设我按照以下方式创建了HiveContext:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)然后,当我尝试将查询处理为:
hiveContext.sql("""
| SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
| FROM date_dim dt, store_sales, item
| WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
| AND store_sales.ss_item_sk = item.i_item_sk
| AND item.i_manufact_id = 128
| AND dt.d_moy=11
| GROUP BY dt.d_year, item.i_brand, item.i_brand_id
| ORDER BY dt.d_year, sum_agg desc, brand_id
| LIMIT 100
""").collect().foreach(println)是否有方法检查催化剂优化器的存在?如果不存在,那么如何为HiveContext启用催化剂优化器?
发布于 2016-09-23 12:41:23
Spark2.0中始终启用催化剂查询优化器。当您使用Spark2.0的Datasets时,这是免费优化的一部分(也是在使用RDDs之前应该使用Datasets的众多原因之一)。
如果希望看到优化催化剂查询优化器应用于查询,请在TRACE中为SparkOptimizer使用conf/log4j.properties日志记录级别
log4j.logger.org.apache.spark.sql.execution.SparkOptimizer=TRACE使用它,每当您触发查询的执行(通过show、collect或仅通过explain)时,您都会看到工作催化剂查询优化器在每次执行查询时都会为您执行大量日志。
让我们看看列剪枝优化规则在起作用。
// the business object
case class Person(id: Long, name: String, city: String)
// the dataset to query over
val dataset = Seq(Person(0, "Jacek", "Warsaw")).toDS
// the query
// Note that we work with names only (out of 3 attributes in Person)
val query = dataset.groupBy(upper('name) as 'name).count
scala> query.explain(extended = true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
!+- LocalRelation [id#125L, name#126, city#127] +- Project [name#126]
! +- LocalRelation [id#125L, name#126, city#127]
...
== Parsed Logical Plan ==
'Aggregate [upper('name) AS name#160], [upper('name) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]
== Analyzed Logical Plan ==
name: string, count: bigint
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]
== Optimized Logical Plan ==
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [name#126]
== Physical Plan ==
*HashAggregate(keys=[upper(name#126)#171], functions=[count(1)], output=[name#160, count#166L])
+- Exchange hashpartitioning(upper(name#126)#171, 200)
+- *HashAggregate(keys=[upper(name#126) AS upper(name#126)#171], functions=[partial_count(1)], output=[upper(name#126)#171, count#173L])
+- LocalTableScan [name#126]https://stackoverflow.com/questions/37793560
复制相似问题