首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >蜂巢查询以查找中间周的计数。

蜂巢查询以查找中间周的计数。
EN

Stack Overflow用户
提问于 2019-05-10 05:14:32
回答 2查看 358关注 0票数 4

我有一张像下面这样的桌子

代码语言:javascript
复制
id      week    count   
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30    
A100    201013  36    
A100    201015  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

在这里,我们可以看到,下面的几个星期是缺少的:

  • 前201014名失踪
  • 第二201016号失踪
  • 第三周少了201020,201021,201022

我的要求是,每当我们有缺失的值,我们需要显示前一周的计数。

在这种情况下,输出应该是:

代码语言:javascript
复制
id      week    count
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30   
A100    201013  36    
A100    201014  36    
A100    201015  43    
A100    201016  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201020  63
A100    201021  63    
A100    201022  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

我如何使用hive/pyspark来实现这一要求?

EN

回答 2

Stack Overflow用户

发布于 2019-05-13 14:18:18

虽然这个答案是在Scala中的,但是Python看起来几乎是一样的&可以很容易地转换。

步骤1:

在它之前找出缺少周值的行。

样本输入:

代码语言:javascript
复制
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

//sample input
val input = sc.parallelize(List(("A100",201008,2), ("A100",201009,9),("A100",201014,4), ("A100",201016,45))).toDF("id","week","count")

scala> input.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201014|    4| //missing 4 rows
|A100|201016|   45| //missing 1 row
+----+------+-----+ 

要找到它,我们可以在.lead()上使用week函数。并计算了leadWeekweek之间的差异。差异不应该是> 1,如果是的话,在它之前有缺失的行。

代码语言:javascript
复制
val diffDF = input
  .withColumn("leadWeek", lead($"week", 1).over(Window.partitionBy($"id").orderBy($"week")))   // partitioning by id & computing lead()
  .withColumn("diff", ($"leadWeek" - $"week") -1)                                 // finding difference between leadWeek & week

scala> diffDF.show
+----+------+-----+--------+----+
|  id|  week|count|leadWeek|diff|
+----+------+-----+--------+----+
|A100|201008|    2|  201009|   0| // diff -> 0 represents that no rows needs to be added
|A100|201009|    9|  201014|   4| // diff -> 4 represents 4 rows are to be added after this row.
|A100|201014|    4|  201016|   1| // diff -> 1 represents 1 row to be added after this row.
|A100|201016|   45|    null|null|
+----+------+-----+--------+----+

步骤2:

  • 如果diff为>= 1:创建并添加n个行数(InputWithDiff,请检查下面的case类),则按diff中指定的情况进行,并相应地增加week值。返回新创建的行和原始行。
  • 如果diff为0,则不需要额外的计算。返回原来的行。

为了便于计算,将diffDF转换为Dataset。

代码语言:javascript
复制
case class InputWithDiff(id: Option[String], week: Option[Int], count: Option[Int], leadWeek: Option[Int], diff: Option[Int])

val diffDS = diffDF.as[InputWithDiff]

val output = diffDS.flatMap(x => {
 val diff = x.diff.getOrElse(0) 

 diff match {
  case n if n >= 1 => x :: (1 to diff).map(y => InputWithDiff(x.id, Some(x.week.get + y), x.count,x.leadWeek, x.diff)).toList  // create and append new Rows
  case _ => List(x)      // return as it is
 }
}).drop("leadWeek", "diff").toDF   // drop unnecessary columns & convert to DF

最后产出:

代码语言:javascript
复制
scala> output.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201010|    9|
|A100|201011|    9|
|A100|201012|    9|
|A100|201013|    9|
|A100|201014|    4|
|A100|201015|    4|
|A100|201016|   45|
+----+------+-----+
票数 3
EN

Stack Overflow用户

发布于 2019-05-17 04:36:20

PySpark解决方案

样本数据

代码语言:javascript
复制
df = spark.createDataFrame([(1,201901,10),
                            (1,201903,9),
                            (1,201904,21),
                            (1,201906,42),
                            (1,201909,3),
                            (1,201912,56)
                           ],['id','weeknum','val'])
df.show()
+---+-------+---+
| id|weeknum|val|
+---+-------+---+
|  1| 201901| 10|
|  1| 201903|  9|
|  1| 201904| 21|
|  1| 201906| 42|
|  1| 201909|  3|
|  1| 201912| 56|
+---+-------+---+

1)基本思想是使用cross join创建所有id和周(从最小可能值到最大值)的组合。

代码语言:javascript
复制
from pyspark.sql.functions import min,max,sum,when
from pyspark.sql import Window
min_max_week = df.agg(min(df.weeknum),max(df.weeknum)).collect()
#Generate all weeks using range
all_weeks = spark.range(min_max_week[0][0],min_max_week[0][1]+1)
all_weeks = all_weeks.withColumnRenamed('id','weekno')
#all_weeks.show()
id_all_weeks = df.select(df.id).distinct().crossJoin(all_weeks).withColumnRenamed('id','aid')
#id_all_weeks.show()

2)此后,在这些组合上对原始数据进行left join处理有助于识别丢失的值。

代码语言:javascript
复制
res = id_all_weeks.join(df,(df.id == id_all_weeks.aid) & (df.weeknum == id_all_weeks.weekno),'left')
res.show()
+---+------+----+-------+----+
|aid|weekno|  id|weeknum| val|
+---+------+----+-------+----+
|  1|201911|null|   null|null|
|  1|201905|null|   null|null|
|  1|201903|   1| 201903|   9|
|  1|201904|   1| 201904|  21|
|  1|201901|   1| 201901|  10|
|  1|201906|   1| 201906|  42|
|  1|201908|null|   null|null|
|  1|201910|null|   null|null|
|  1|201912|   1| 201912|  56|
|  1|201907|null|   null|null|
|  1|201902|null|   null|null|
|  1|201909|   1| 201909|   3|
+---+------+----+-------+----+

3)然后,使用窗口函数、sum ->分配组和max ->组合来填充组分类后的缺失值。

代码语言:javascript
复制
w1 = Window.partitionBy(res.aid).orderBy(res.weekno)
groups = res.withColumn("grp",sum(when(res.id.isNull(),0).otherwise(1)).over(w1))
w2 = Window.partitionBy(groups.aid,groups.grp)
missing_values_filled = groups.withColumn('filled',max(groups.val).over(w2)) #select required columns as needed
missing_values_filled.show() 

+---+------+----+-------+----+---+------+
|aid|weekno|  id|weeknum| val|grp|filled|
+---+------+----+-------+----+---+------+
|  1|201901|   1| 201901|  10|  1|    10|
|  1|201902|null|   null|null|  1|    10|
|  1|201903|   1| 201903|   9|  2|     9|
|  1|201904|   1| 201904|  21|  3|    21|
|  1|201905|null|   null|null|  3|    21|
|  1|201906|   1| 201906|  42|  4|    42|
|  1|201907|null|   null|null|  4|    42|
|  1|201908|null|   null|null|  4|    42|
|  1|201909|   1| 201909|   3|  5|     3|
|  1|201910|null|   null|null|  5|     3|
|  1|201911|null|   null|null|  5|     3|
|  1|201912|   1| 201912|  56|  6|    56|
+---+------+----+-------+----+---+------+

具有与上面描述的相同逻辑的Hive查询(假设可以创建一个包含所有周的表)

代码语言:javascript
复制
select id,weeknum,max(val) over(partition by id,grp) as val
from (select i.id
            ,w.weeknum
            ,t.val
            ,sum(case when t.id is null then 0 else 1 end) over(partition by i.id order by w.weeknum) as grp 
      from (select distinct id from tbl) i
      cross join weeks_table w
      left join tbl t on t.id = i.id and w.weeknum = t.weeknum
     ) t
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56071059

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档