首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在PySpark数据帧中的选定时间间隔内复制日期间隔之间的记录

在PySpark数据帧中的选定时间间隔内复制日期间隔之间的记录
EN

Stack Overflow用户
提问于 2020-12-11 06:46:17
回答 2查看 1K关注 0票数 1

我有一个PySpark数据,它可以跟踪产品价格和状态在几个月内发生的变化。这意味着只有在与前一个月相比发生更改(无论是状态还是价格)时才创建新行,如下面的虚拟数据所示。

代码语言:javascript
复制
    ----------------------------------------
    |product_id| status    | price| month  |
    ----------------------------------------
    |1         | available | 5    | 2019-10|
    ----------------------------------------
    |1         | available | 8    | 2020-08|
    ----------------------------------------
    |1         | limited   | 8    | 2020-10|
    ----------------------------------------
    |2         | limited   | 1    | 2020-09|
    ----------------------------------------
    |2         | limited   | 3    | 2020-10|
    ----------------------------------------

我想要创建一个显示过去6个月中每一个值的数据。这意味着,每当在上面的数据中出现空白时,我就需要复制记录。例如,如果最后6个月是2020-07,2020-08,. 2020-12,那么上述数据的结果应该是

代码语言:javascript
复制
    ----------------------------------------
    |product_id| status    | price| month  |
    ----------------------------------------
    |1         | available | 5    | 2020-07|
    ----------------------------------------
    |1         | available | 8    | 2020-08|
    ----------------------------------------
    |1         | available | 8    | 2020-09|
    ----------------------------------------
    |1         | limited   | 8    | 2020-10|
    ----------------------------------------
    |1         | limited   | 8    | 2020-11|
    ----------------------------------------
    |1         | limited   | 8    | 2020-12|
    ----------------------------------------
    |2         | limited   | 1    | 2020-09|
    ----------------------------------------
    |2         | limited   | 3    | 2020-10|
    ----------------------------------------
    |2         | limited   | 3    | 2020-11|
    ----------------------------------------
    |2         | limited   | 3    | 2020-12|
    ----------------------------------------

请注意,对于product_id =1,在2019-10年有一个较旧的记录,该记录一直传播到2020-08,然后被裁剪,而对于product_id =2,在2020-09之前没有记录,因此2020-07,2020-08月份没有填充(因为该产品在2020-09之前并不存在)。

由于数据文件由数百万条记录组成,所以使用循环和检查每个product_id的“蛮力”解决方案相当缓慢。使用窗口函数解决这个问题似乎是可能的,方法是创建另一个列next_month,然后根据该列填充空白,但我不知道如何实现这一点。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-12-16 09:56:13

关于@jxc注释,我已经为这个用例准备了答案。

下面是代码片段。

  1. 导入spark函数

from pyspark.sql import functions as F, Window

  1. 准备示例数据

代码语言:javascript
复制
    simpleData = ((1,"Available",5,"2020-07"),                                                              
    (1,"Available",8,"2020-08"),                                           
    (1,"Limited",8,"2020-12"),                                           
    (2,"Limited",1,"2020-09"),                                          
    (2,"Limited",3,"2020-12")
    )

    
    columns= ["product_id", "status", "price", "month"]

  1. 创建示例数据的数据格式

df = spark.createDataFrame(data = simpleData, schema = columns)

  1. 在dataframe中添加日期列以获得正确的格式化日期

代码语言:javascript
复制
    df0 = df.withColumn("date",F.to_date('month','yyyy-MM'))

    df0.show()

    +----------+---------+-----+-------+----------+                                          
    |product_id|   status|price|  month|      date|                                               
    +----------+---------+-----+-------+----------+                                                
    |         1|Available|    5|2020-07|2020-07-01|                                                 
    |         1|Available|    8|2020-08|2020-08-01|                                                
    |         1|  Limited|    8|2020-12|2020-12-01|                                                
    |         2|  Limited|    1|2020-09|2020-09-01|                                                
    |         2|  Limited|    3|2020-12|2020-12-01|                                                
    +----------+---------+-----+-------+----------+

  1. 创建WinSpec w1并使用窗口聚合函数引导查找下一个日期结束(W1),将其转换为前几个月以设置日期序列:

代码语言:javascript
复制
    w1 = Window.partitionBy('product_id').orderBy('date')
    df1 = df0.withColumn('end_date',F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
    df1.show()

    +----------+---------+-----+-------+----------+----------+                                                                  
    |product_id|   status|price|  month|      date|  end_date|                                                      
    +----------+---------+-----+-------+----------+----------+                                              
    |         1|Available|    5|2020-07|2020-07-01|2020-07-01|                                                      
    |         1|Available|    8|2020-08|2020-08-01|2020-11-01|                                                            
    |         1|  Limited|    8|2020-12|2020-12-01|2020-12-01|                                                                     
    |         2|  Limited|    1|2020-09|2020-09-01|2020-11-01|                                                                            
    |         2|  Limited|    3|2020-12|2020-12-01|2020-12-01|                                                                                   
    +----------+---------+-----+-------+----------+----------+

  1. 使用months_between(end_date,date)计算两个日期之间月份的#,并使用转换函数迭代序列(0,# months ),用date=add_months(date,i)和price=IF(i=0,价格,价格)创建named_struct,使用inline_outer爆炸结构数组。

代码语言:javascript
复制
    df2 = df1.selectExpr("product_id", "status", inline_outer( transform( sequence(0,int(months_between(end_date, date)),1), i -> (add_months(date,i) as date, IF(i=0,price,price) as price) ) ) )

    df2.show()

    +----------+---------+----------+-----+                                                    
    |product_id|   status|      date|price|                                                             
    +----------+---------+----------+-----+                                                              
    |         1|Available|2020-07-01|    5|                                                              
    |         1|Available|2020-08-01|    8|                                                  
    |         1|Available|2020-09-01|    8|                                                           
    |         1|Available|2020-10-01|    8|                                                             
    |         1|Available|2020-11-01|    8|                                                                 
    |         1|  Limited|2020-12-01|    8|                                                                
    |         2|  Limited|2020-09-01|    1|                                                                                 
    |         2|  Limited|2020-10-01|    1|                                                    
    |         2|  Limited|2020-11-01|    1|                                                                          
    |         2|  Limited|2020-12-01|    3|                                                          
    +----------+---------+----------+-----+                    

  1. Partitioning product_id上的dataframe并在df3中添加一个秩列以获得每一行的行号。然后,为每个df4

使用新列max_rank存储rank列值的最大值,并将max_rank存储到product_id中。

代码语言:javascript
复制
    w2 = Window.partitionBy('product_id').orderBy('date')                                                            
    df3 = df2.withColumn('rank',F.row_number().over(w2))                                                                 
    Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int]
    df3.show()
    +----------+---------+----------+-----+----+
    |product_id|   status|      date|price|rank|
    +----------+---------+----------+-----+----+
    |         1|Available|2020-07-01|    5|   1|
    |         1|Available|2020-08-01|    8|   2|
    |         1|Available|2020-09-01|    8|   3|
    |         1|Available|2020-10-01|    8|   4|
    |         1|Available|2020-11-01|    8|   5|
    |         1|  Limited|2020-12-01|    8|   6|
    |         2|  Limited|2020-09-01|    1|   1|
    |         2|  Limited|2020-10-01|    1|   2|
    |         2|  Limited|2020-11-01|    1|   3|
    |         2|  Limited|2020-12-01|    3|   4|
    +----------+---------+----------+-----+----+ 

                                                                                                           
    df4 = df3.groupBy("product_id").agg(F.max('rank').alias('max_rank'))                                                           
    Schema: DataFrame[product_id: bigint, max_rank: int]
    df4.show()
    +----------+--------+
    |product_id|max_rank|
    +----------+--------+
    |         1|       6|
    |         2|       4|
    +----------+--------+

在get max_rank上加入df3df4数据的

代码语言:javascript
复制
    df5 = df3.join(df4,df3.product_id == df4.product_id,"inner") \
             .select(df3.product_id,df3.status,df3.date,df3.price,df3.rank,df4.max_rank)                                                                                          
    Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int, max_rank: int]
    df5.show()
    +----------+---------+----------+-----+----+--------+
    |product_id|   status|      date|price|rank|max_rank|
    +----------+---------+----------+-----+----+--------+
    |         1|Available|2020-07-01|    5|   1|       6|
    |         1|Available|2020-08-01|    8|   2|       6|
    |         1|Available|2020-09-01|    8|   3|       6|
    |         1|Available|2020-10-01|    8|   4|       6|
    |         1|Available|2020-11-01|    8|   5|       6|
    |         1|  Limited|2020-12-01|    8|   6|       6|
    |         2|  Limited|2020-09-01|    1|   1|       4|
    |         2|  Limited|2020-10-01|    1|   2|       4|
    |         2|  Limited|2020-11-01|    1|   3|       4|
    |         2|  Limited|2020-12-01|    3|   4|       4|
    +----------+---------+----------+-----+----+--------+

然后,

  1. 使用between函数对df5数据进行过滤,以获得最新的6个月数据。

代码语言:javascript
复制
    FinalResultDF = df5.filter(F.col('rank') \                                      
                         .between(F.when((F.col('max_rank') > 5),(F.col('max_rank')-6)).otherwise(0),F.col('max_rank'))) \
                         .select(df5.product_id,df5.status,df5.date,df5.price)

    FinalResultDF.show(truncate=False)   
代码语言:javascript
复制
    +----------+---------+----------+-----+                                               
    |product_id|status   |date      |price|                                                
    +----------+---------+----------+-----+                                                               
    |1         |Available|2020-07-01|5    |                                                                                
    |1         |Available|2020-08-01|8    |                                                                                          
    |1         |Available|2020-09-01|8    |                                                                                                           
    |1         |Available|2020-10-01|8    |                                                                                                             
    |1         |Available|2020-11-01|8    |                                                                                                               
    |1         |Limited  |2020-12-01|8    |                                                                                                                     
    |2         |Limited  |2020-09-01|1    |                                                                                                                     
    |2         |Limited  |2020-10-01|1    |                                                                                                                        
    |2         |Limited  |2020-11-01|1    |                                                                                                                      
    |2         |Limited  |2020-12-01|3    |                                                                                                         
    +----------+---------+----------+-----+
票数 1
EN

Stack Overflow用户

发布于 2020-12-17 12:07:26

使用spark sql:

给定输入数据:

代码语言:javascript
复制
val df = spark.sql(""" with t1 (
 select  1 c1,   'available' c2, 5 c3,   '2019-10' c4  union all
 select  1 c1,   'available' c2, 8 c3,   '2020-08' c4  union all
 select  1 c1,   'limited' c2, 8 c3,   '2020-10' c4  union all
 select  2 c1,   'limited' c2, 1 c3,   '2020-09' c4  union all
 select  2 c1,   'limited' c2, 3 c3,   '2020-10' c4 
  )  select   c1  product_id,   c2   status    ,   c3   price,   c4  month      from t1
""")

df.createOrReplaceTempView("df")
df.show(false)

+----------+---------+-----+-------+
|product_id|status   |price|month  |
+----------+---------+-----+-------+
|1         |available|5    |2019-10|
|1         |available|8    |2020-08|
|1         |limited  |8    |2020-10|
|2         |limited  |1    |2020-09|
|2         |limited  |3    |2020-10|
+----------+---------+-----+-------+

筛选日期窗口,即从2020-07到2020-12的6个月,并将它们存储在df1中

代码语言:javascript
复制
val df1 = spark.sql("""
select * from df where month > '2020-07' and month < '2020-12' 
""")
df1.createOrReplaceTempView("df1")
df1.show(false)

+----------+---------+-----+-------+
|product_id|status   |price|month  |
+----------+---------+-----+-------+
|1         |available|8    |2020-08|
|1         |limited  |8    |2020-10|
|2         |limited  |1    |2020-09|
|2         |limited  |3    |2020-10|
+----------+---------+-----+-------+

较低的边界--当月<='2020-07‘时得到最大值。将月份改写为“2020-07”

代码语言:javascript
复制
val df2 = spark.sql("""
select product_id, status, price, '2020-07' month from df  where (product_id,month) in 
( select product_id, max(month) from df where month <= '2020-07' group by 1 ) 
""")
df2.createOrReplaceTempView("df2")
df2.show(false)

+----------+---------+-----+-------+
|product_id|status   |price|month  |
+----------+---------+-----+-------+
|1         |available|5    |2020-07|
+----------+---------+-----+-------+

上界-使用<='2020-12‘得到最大值。将月份改写为“2020-12”

代码语言:javascript
复制
val df3 = spark.sql("""
select product_id, status, price, '2020-12' month from df where (product_id, month) in  
( select product_id, max(month) from df where month <= '2020-12' group by 1 ) 
""")
df3.createOrReplaceTempView("df3")
df3.show(false)

+----------+-------+-----+-------+
|product_id|status |price|month  |
+----------+-------+-----+-------+
|1         |limited|8    |2020-12|
|2         |limited|3    |2020-12|
+----------+-------+-----+-------+

现在合并所有的3并将其存储在df4中

代码语言:javascript
复制
val df4 = spark.sql("""
select  product_id, status, price,  month from df1  union all 
select  product_id, status, price,  month from df2  union all 
select  product_id, status, price,  month from df3
order by product_id, month
""")
df4.createOrReplaceTempView("df4")
df4.show(false)

+----------+---------+-----+-------+
|product_id|status   |price|month  |
+----------+---------+-----+-------+
|1         |available|5    |2020-07|
|1         |available|8    |2020-08|
|1         |limited  |8    |2020-10|
|1         |limited  |8    |2020-12|
|2         |limited  |1    |2020-09|
|2         |limited  |3    |2020-10|
|2         |limited  |3    |2020-12|
+----------+---------+-----+-------+

结果:使用序列(date1,date2,间隔1个月)生成缺少月份的日期数组。引爆阵列你就能得到结果。

代码语言:javascript
复制
spark.sql("""
select product_id, status, price, month, explode(dt) res_month from 
(
select t1.*, 
case when months_between(lm||'-01',month||'-01')=1.0 then array(month||'-01')
     when month='2020-12' then array(month||'-01')
     else sequence(to_date(month||'-01'), add_months(to_date(lm||'-01'),-1), interval 1 month ) 
end dt 
     from (
            select product_id, status, price, month, 
            lead(month) over(partition by product_id order by month) lm 
            from df4 
          ) t1 
    ) t2 
  order by product_id, res_month
""")
.show(false)

+----------+---------+-----+-------+----------+
|product_id|status   |price|month  |res_month |
+----------+---------+-----+-------+----------+
|1         |available|5    |2020-07|2020-07-01|
|1         |available|8    |2020-08|2020-08-01|
|1         |available|8    |2020-08|2020-09-01|
|1         |limited  |8    |2020-10|2020-10-01|
|1         |limited  |8    |2020-10|2020-11-01|
|1         |limited  |8    |2020-12|2020-12-01|
|2         |limited  |1    |2020-09|2020-09-01|
|2         |limited  |3    |2020-10|2020-10-01|
|2         |limited  |3    |2020-10|2020-11-01|
|2         |limited  |3    |2020-12|2020-12-01|
+----------+---------+-----+-------+----------+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65246883

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档