我在文件test.sql中有一个Spark查询-
CREATE GLOBAL TEMPORARY VIEW VIEW_1 AS select a,b from abc
CREATE GLOBAL TEMPORARY VIEW VIEW_2 AS select a,b from VIEW_1
select * from VIEW_2现在,我启动我的火花壳,然后试着这样执行-
val sql = scala.io.Source.fromFile("test.sql").mkString
spark.sql(sql).show如果出现以下错误,这将失败-
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<' expecting {<EOF>, 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'OR', 'AND', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 128)我尝试在不同的spark.sql语句中1比1执行这些查询,并且运行良好。问题是,我有6-7个查询来创建临时视图,最后我需要从我的最后一个视图中输出。是否有一种方法可以在一个spark.sql语句中运行这些SQL。我研究过Postgres (Redshift),它能够执行这样的查询。在spark sql中,在这种情况下,我需要维护大量的文件。
发布于 2018-03-10 19:39:38
问题是,mkString将单个字符串中的所有行连接在一起,不能正确地将其解析为有效的SQL查询。
脚本文件中的每一行都应该作为一个单独的查询执行,例如:
scala.io.Source.fromFile("test.sql").getLines()
.filterNot(_.isEmpty) // filter out empty lines
.foreach(query =>
spark.sql(query).show
)更新
如果在多行上拆分查询,则情况会更复杂一些。
我们绝对需要有一个标记查询结束的令牌。让它是分号字符,就像在标准SQL中一样。
首先,我们从源文件中收集所有非空行:
val lines = scala.io.Source.fromFile(sqlFile).getLines().filterNot(_.isEmpty)然后,我们处理收集的行,如果没有以分号结尾,则将每个新行与前一行连接起来:
val queries = lines.foldLeft(List[String]()) { case(queries, line) =>
queries match {
case Nil => List(line) // case for the very first line
case init :+ last =>
if (last.endsWith(";")) {
// if a query ended on a previous line, we simply append the new line to the list of queries
queries :+ line.trim
} else {
// the query is not terminated yet, concatenate the line with the previous one
val queryWithNextLine = last + " " + line.trim
init :+ queryWithNextLine
}
}
}https://stackoverflow.com/questions/49212433
复制相似问题