我想要做的是用项目和日期来制作一个,以及另一个列"3_avg“,这是从给定日期开始的最后三天的平均值。另一种说法是,如果2022-5-5是星期四,我希望该行的3_avg值是过去三个星期四该商品的平均销售额,因此4/28、4/21和4/14。
到目前为止我已经知道了,但这只是一周中整个专栏的平均值.我不知道怎样才能把它按项目和日期区分开来,只使用最后三个?我试图让它与day_of_week一起工作,但我的大脑无法将它与我需要发生的事情联系起来。
df_fcst_dow = (
df_avg
.withColumn("day_of_week", F.dayofweek(F.col("trn_dt")))
.groupBy("item", "date", "day_of_week")
.agg(
F.sum(F.col("sales") / 3).alias("3_avg")
)
)发布于 2022-05-06 17:48:57
你可以用一个窗口来做这件事,也可以用一个组来做。在这里,我鼓励分组,因为它将更好地分配工作之间的工人节点。我们创建当前日期和下两个日期的数组。然后我们爆炸这个数组,给我们的数据重复计算所有我们想要的日期,这样我们就可以把它分组成一个平均值。
import pyspark.sql.functions as F
>>> spark.table("trn_dt").show()
+----+----------+-----+
|item| date|sales|
+----+----------+-----+
| 1|2016-01-03| 16.0|
| 1|2016-01-02| 15.0|
| 1|2016-01-05| 9.0|
| 1|2016-01-04| 10.0|
| 1|2016-01-01| 11.0|
| 1|2016-01-07| 10.0|
| 1|2016-01-06| 7.0|
+----+----------+-----+
df_avg.withColumn( "dates",
F.array( #building array of dates
df_avg["date"],
F.date_add( df_avg["date"], 1),
F.date_add( df_avg["date"], 2)
)).select(
F.col("item"),
F.explode("dates") ).alias("ThreeDayAve"), # tripling our data
F.col("sales")
).groupBy( "item","ThreeDayAve")
.agg( F.avg("sales").alias("3_avg")).show()
+----+-----------+------------------+
|item|ThreeDayAve| 3_avg|
+----+-----------+------------------+
| 1| 2016-01-05|11.666666666666666|
| 1| 2016-01-04|13.666666666666666|
| 1| 2016-01-07| 8.666666666666666|
| 1| 2016-01-01| 11.0|
| 1| 2016-01-03| 14.0|
| 1| 2016-01-02| 13.0|
| 1| 2016-01-09| 10.0|
| 1| 2016-01-06| 8.666666666666666|
| 1| 2016-01-08| 8.5|
+----+-----------+------------------+您可能会在这方面使用窗口,但它在大型数据集上的性能不会很好。
https://stackoverflow.com/questions/72134398
复制相似问题