首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在PySpark中在一天内累积超过1小时的窗口

如何在PySpark中在一天内累积超过1小时的窗口
EN

Stack Overflow用户
提问于 2018-01-17 13:16:11
回答 1查看 5.3K关注 0票数 3

我有一个火花DataFrame,如下所示:

代码语言:javascript
复制
+---------+--------------------------+
|group_id |event_time                |
+---------+--------------------------+
|XXXX     |2017-10-25 14:47:02.717013|
|XXXX     |2017-10-25 14:47:25.444979|
|XXXX     |2017-10-25 14:49:32.21353 |
|YYYY     |2017-10-25 14:50:38.321134|
|YYYY     |2017-10-25 14:51:12.028447|
|ZZZZ     |2017-10-25 14:51:24.810688|
|YYYY     |2017-10-25 14:37:34.241097|
|ZZZZ     |2017-10-25 14:37:24.427836|
|XXXX     |2017-10-25 14:37:24.620864|
|YYYY     |2017-10-25 14:37:24.964614|
+---------+--------------------------+

我想要计算每一天内每小时的滚动事件计数( group_id )。

因此,对于datetime 25-10 14:00group_id,我想要计算从25-10 00:0025-10 14:00group_id的事件计数。

做以下事情:

代码语言:javascript
复制
df.groupBy('group_id', window('event_time', '1 hour').alias('model_window')) \
    .agg(dfcount(lit(1)).alias('values'))

计算每小时的事件数,但不计算每天累积的事件数。

有什么想法吗?

编辑:预期的输出如下所示:

代码语言:javascript
复制
    +---------+---------------------------------------------+-------+
    |group_id |model_window                                 |values |         
    +---------+---------------------------------------------+-------+
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]| 10    |
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]| 17    |
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]| 22    |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]| 0     |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]| 1     |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]| 9     |
    +---------+---------------------------------------------+-------+
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-01-17 13:40:29

想计算一下..。每group_id每天一小时。

提取数据和小时:

代码语言:javascript
复制
from pyspark.sql.functions import col, count, hour, sum

extended = (df
  .withColumn("event_time", col("event_time").cast("timestamp"))
  .withColumn("date", col("event_time").cast("date"))
  .withColumn("hour", hour(col("event_time"))))

计算聚合

代码语言:javascript
复制
aggs = extended.groupBy("group_id", "date", "hour").count()

我想计算事件的滚动计数。

并使用窗口函数:

代码语言:javascript
复制
from pyspark.sql.window import Window

aggs.withColumn(
    "agg_count", 
    sum("count").over(Window.partitionBy("group_id", "date").orderBy("hour")))

要获得缺少间隔的0,您必须为每个日期和时间生成引用数据,并加入它。

df定义为:

代码语言:javascript
复制
df = sc.parallelize([
    ("XXXX", "2017-10-25 01:47:02.717013"),
    ("XXXX", "2017-10-25 14:47:25.444979"),
    ("XXXX", "2017-10-25 14:49:32.21353"),
    ("YYYY", "2017-10-25 14:50:38.321134"),
    ("YYYY", "2017-10-25 14:51:12.028447"),
    ("ZZZZ", "2017-10-25 14:51:24.810688"),
    ("YYYY", "2017-10-25 14:37:34.241097"),
    ("ZZZZ", "2017-10-25 14:37:24.427836"),
    ("XXXX", "2017-10-25 22:37:24.620864"),
    ("YYYY", "2017-10-25 16:37:24.964614")
]).toDF(["group_id", "event_time"])

结果是

代码语言:javascript
复制
+--------+----------+----+-----+---------+                                      
|group_id|      date|hour|count|agg_count|
+--------+----------+----+-----+---------+
|    XXXX|2017-10-25|   1|    1|        1|
|    XXXX|2017-10-25|  14|    2|        3|
|    XXXX|2017-10-25|  22|    1|        4|
|    ZZZZ|2017-10-25|  14|    2|        2|
|    YYYY|2017-10-25|  14|    3|        3|
|    YYYY|2017-10-25|  16|    1|        4|
+--------+----------+----+-----+---------+
票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48302090

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档