首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SparkSQL -滞后函数?

SparkSQL -滞后函数?
EN

Stack Overflow用户
提问于 2015-09-14 15:47:39
回答 1查看 12K关注 0票数 5

我在这个DataBricks邮政中看到,在SparkSql中有对窗口函数的支持,特别是我正在尝试使用lag()窗口函数。

我有几行信用卡事务,我已经对它们进行了排序,现在我想迭代这些行,并为每一行显示事务的数量,以及当前行的金额和上一行的金额的差异。

在DataBricks帖子之后,我想出了这个查询,但它向我抛出了一个异常,我无法完全理解为什么。

这在PySpark..。tx是我已经在临时表注册时创建的数据。

代码语言:javascript
复制
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

和异常(截断)..

代码语言:javascript
复制
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

我真的很有洞察力,这个功能是相当新的,并且没有太多的东西可以继续到现有的例子或者其他相关的文章中去。

编辑

我也尝试过在没有SQL语句的情况下这样做,但是继续得到一个错误。我在Hive和SQLContext中使用了这个,并收到了同样的错误。

代码语言:javascript
复制
windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()
代码语言:javascript
复制
Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-09-14 16:15:31

  1. 框架规范应该以关键字ROWS而不是ROW开头。
  2. 帧规范需要下限值。 前一行和当前行之间的行 或UNBOUNDED关键字 前一行和当前行之间的行
  3. LAG函数根本不接受框架,所以一个正确的带有滞后的SQL查询可以如下所示 从tx中选择tx.cc_num、tx.trans_date、tx.trans_time、tx.amt、tx.amt(tx.amt) OVER ( tx.cc_num ORDER BY tx.trans_date,tx.trans_time )作为prev_amt

编辑

关于SQL的使用:

  1. ,因为您可以在错误消息中读取 注意,使用窗口函数当前需要一个HiveContex

确保使用sqlContext (而不是SQLContext )初始化HiveContext

  1. windowSpec.rowsBetween(-1, 0)什么也不做,但是lag函数同样不支持框架规范。
票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32568854

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档