首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用extraOptimizations转换Spark

用extraOptimizations转换Spark
EN

Stack Overflow用户
提问于 2016-10-25 08:55:00
回答 2查看 1.2K关注 0票数 2

我希望将SQL字符串作为用户输入,然后在执行之前对其进行转换。特别是,我希望修改顶层投影(select子句),注入要由查询检索的额外列。

我希望通过使用sparkSession.experimental.extraOptimizations连接到催化剂上来实现这一点。我知道我尝试的不是严格地说是优化(转换改变了SQL语句的语义),但是API似乎仍然合适。但是,查询执行器似乎忽略了我的转换。

下面是一个很小的例子来说明我所面临的问题。首先定义一个行案例类:

代码语言:javascript
复制
case class TestRow(a: Int, b: Int, c: Int)

然后定义一个简单地丢弃任何投影的优化规则:

代码语言:javascript
复制
object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
        case x: Project => x.child
    }
}

现在创建一个数据集,注册优化,并运行一个SQL查询:

代码语言:javascript
复制
// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)

// Register "optimisation".
sparkSession.experimental.extraOptimizations =  
    Seq(RemoveProjectOptimisationRule)

// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a = 1")

// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)

这是输出:

代码语言:javascript
复制
Query result: 
[1]

== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
   +- 'UnresolvedRelation `testtable`

== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
   +- SubqueryAlias testtable
      +- LocalRelation [a#3, b#4, c#5]

== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]

== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]

我们看到,结果与原始SQL语句的结果相同,没有应用转换。然而,当打印逻辑和物理计划时,投影确实被删除了。我还确认(通过调试日志输出)确实正在调用转换。

对这里发生了什么有什么建议吗?也许优化器只是忽略了改变语义的“优化”?

如果使用优化不是一条路,谁能建议另一种选择呢?我真正想做的就是解析输入SQL语句,对其进行转换,并将转换后的AST传递给Spark执行。但据我所见,用于执行此操作的API对于星火sql包是私有的。也许可以使用反射,但我想避免这种情况。

任何指示都将不胜感激。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-10-26 23:52:31

正如您所猜测的,这是无法工作的,因为我们假设优化器不会更改查询的结果。

具体来说,我们缓存分析器产生的模式(并假设优化器不会更改它)。在将行转换为外部格式时,我们使用此模式,从而截断结果中的列。如果您做的不仅仅是截断(即更改的数据类型),这甚至可能会崩溃。

正如您在这个笔记本中所看到的,它实际上正在产生您在幕后所期望的结果。我们计划在不久的将来打开更多的钩子,这将允许您在查询执行的其他阶段修改计划。有关更多详细信息,请参阅火花-18127

票数 5
EN

Stack Overflow用户

发布于 2016-10-27 09:43:41

Michael的回答证实了这种转变不应该通过优化来完成。

相反,我在Spark中使用了内部API来实现我目前想要的转换。它需要的方法是包-私有在星火。因此,我们可以通过在适当的包中放置相关的逻辑来访问它们,而无需考虑。大纲:

代码语言:javascript
复制
// Must be in the spark.sql package.
package org.apache.spark.sql

object SQLTransformer {
    def apply(sparkSession: SparkSession, ...) = {

        // Get the AST.
        val ast = sparkSession.sessionState.sqlParser.parsePlan(sql)

        // Transform the AST.
        val transformedAST = ast match {
            case node: Project => // Modify any top-level projection 
            ...
        }

        // Create a dataset directly from the AST.
        Dataset.ofRows(sparkSession, transformedAST)
    }
}

请注意,这当然可能与未来版本的星火中断。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40235566

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档