首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >窗口函数需要HiveContext

窗口函数需要HiveContext
EN

Stack Overflow用户
提问于 2019-04-11 12:44:21
回答 1查看 479关注 0票数 0

我有以下scala代码来从Spark中拉取数据:

代码语言:javascript
复制
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import org.apache.spark.sql.functions._

    val emailDF = loadTable("email")
              .where(s"topic = '${Topics.Email}'")
              .cache()

    val df = emailDF.withColumn("rank",row_number()
              .over(Window.partitionBy("email_address")
                          .orderBy(desc("created_at"))))

    val resultDf = df.filter(s"rank == 1").drop("rank")

当运行代码时,我得到了这个错误:

代码语言:javascript
复制
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

搜索了一下,发现我需要添加配置单元依赖项,下面是我更新后的依赖项:

代码语言:javascript
复制
    build.sbt
    val sparkVersion = "1.6.3" 
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
      "org.apache.spark" %% "spark-hive" % sparkVersion % "provided"
    )

然而,我仍然得到了相同的错误。

尝试了hiveContext方法:

代码语言:javascript
复制
        val emailDF = Email.load()
          .filter(col(Email.TopicId).isin(Topics.Email))
          .filter(col(Email.OptIn).isin(optInFlag))
          .cache()

        val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
        logger.info(s"sc: ${sc.appName}, ${sc.sparkUser}")
        emailDF.registerTempTable("emailDFTable")

        val df = hiveContext.sql("""SELECT *,
                                    row_number() over(partition by email_address order by event_at desc) AS rank
                             FROM emailDFTable""")

        val resultDf = df.filter(s"rank == 1").drop("rank")

现在我得到了错误:

代码语言:javascript
复制
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: emailDFTable; line 3 pos 30
        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305)

另一种我尝试过的方法:

代码语言:javascript
复制
val windowSpec = Window.partitionBy(col(EmailChannel.EmailAddress)).orderBy(col(EmailChannel.EventAt).desc)
    val resultDf = emailDF.withColumn("maxEventAt", first("event_at").over(windowSpec))
      .select("*").where(col("maxEventAt") === col(EmailChannel.EventAt))
      .drop("maxEventAt")

然后又得到了类似的错误:

代码语言:javascript
复制
org.apache.spark.sql.AnalysisException: Could not resolve window function 'first_value'. Note that, using window functions currently requires a HiveContext;

我真的不明白我为什么要导入hiveContext并添加spark-hive依赖,为什么它不能工作。我能想到的一件事是我们使用datastax spark,因此我们在build.sbt中有以下依赖项

代码语言:javascript
复制
  "com.datastax.spark"  %% "spark-cassandra-connector" % "1.6.11",

我也需要一个datastax.spark.hive吗?但是没有看到这样的库存在。

我还显示了我的emailDF: emailDF.show(false),它有很多数据,而不是空的。

====更新====

是的,切换到HiveContext工作,我没有注意到在代码开头初始化了SparkContext和SQLContext,而不是使用HiveContext切换SQLContext,我尝试在SparkContext之外创建一个新的HiveContext:

代码语言:javascript
复制
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

这就是它不起作用的原因。在我将SQLContext更改为HiveContext后,它工作正常。

已更改为

代码语言:javascript
复制
  implicit val sc: SparkContext       = new SparkContext(sparkConfig)
  implicit val sqlContext: SQLContext = new SQLContext(sc)

代码语言:javascript
复制
 implicit val sc: SparkContext        = new SparkContext(sparkConfig)
 implicit val sqlContext: HiveContext = new HiveContext(sc)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-11 21:27:54

在Spark1.6中,窗口功能仅在HiveContext中可用。

使用sparkContext(sc)创建hiveContext。

代码语言:javascript
复制
import org.apache.spark.sql.hive.HiveContext

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

将数据帧注册为临时表,并使用hiveContext对临时表运行查询。

代码语言:javascript
复制
emailDF.registerTempTable("emailDFTable")

一旦数据帧注册为临时表,请检查您临时表。

代码语言:javascript
复制
hiveContext.sql("SHOW tables").show()

+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
|        |emaildftable|       true|
+--------+------------+-----------+

现在可以查询临时表了。

代码语言:javascript
复制
val df = hiveContext.sql("""SELECT *,
                                row_number() over(partition by email_address order by created_at desc) AS rank
                         FROM emailDFTable""")

让我知道进展如何。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55624865

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档