我有一个Spark dataframe,它(每天)显示一个产品被使用了多少次。它看起来是这样的:
| x_id | product | usage | yyyy_mm_dd | status |
|------|---------|-------|------------|--------|
| 10 | prod_go | 15 | 2020-10-10 | i |
| 10 | prod_rv | 7 | 2020-10-10 | fc |
| 10 | prod_mb | 0 | 2020-10-10 | n |
| 15 | prod_go | 0 | 2020-10-10 | n |
| 15 | prod_rv | 5 | 2020-10-10 | fc |
| 15 | prod_mb | 1 | 2020-10-10 | fc |
| 10 | prod_go | 20 | 2020-10-11 | i |
| 10 | prod_rv | 11 | 2020-10-11 | i |
| 10 | prod_mb | 3 | 2020-10-11 | fc |
| 15 | prod_go | 0 | 2020-10-11 | n |
| 15 | prod_rv | 5 | 2020-10-11 | fc |
| 15 | prod_mb | 1 | 2020-10-11 | fc |status列基于usage。当usage为0时,它将具有n。当usage在1和9之间,并且status将为fc时。如果usage是>= 10,那么status将是i。
我想向这个Spark数据框架介绍两个额外的列,date_reached_fc和date_reached_i。当min(yyyy_mm_dd)分别达到product的每个状态时,这些列应保存x_id。
根据示例数据,输出将如下所示:
| x_id | product | usage | yyyy_mm_dd | status | date_reached_fc | date_reached_i |
|------|---------|-------|------------|--------|-----------------|----------------|
| 10 | prod_go | 15 | 2020-10-10 | i | null | 2020-10-10 |
| 10 | prod_rv | 7 | 2020-10-10 | fc | 2020-10-10 | null |
| 10 | prod_mb | 0 | 2020-10-10 | n | null | null |
| 15 | prod_go | 0 | 2020-10-10 | n | null | null |
| 15 | prod_rv | 5 | 2020-10-10 | fc | 2020-10-10 | null |
| 15 | prod_mb | 1 | 2020-10-10 | fc | 2020-10-10 | null |
| 10 | prod_go | 20 | 2020-10-11 | i | null | 2020-10-10 |
| 10 | prod_rv | 11 | 2020-10-11 | i | 2020-10-10 | 2020-10-11 |
| 10 | prod_mb | 3 | 2020-10-11 | fc | 2020-10-11 | null |
| 15 | prod_go | 0 | 2020-10-11 | n | null | null |
| 15 | prod_rv | 5 | 2020-10-11 | fc | 2020-10-10 | null |
| 15 | prod_mb | 1 | 2020-10-11 | fc | 2020-10-10 | null |发布于 2020-12-23 21:49:20
排序与您的问题略有不同,但结果应该是正确的...基本上,只需在窗口上使用min,还可以使用when只过滤相关日期。
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'date_reached_fc',
F.min(F.when(F.col('status') == 'fc', F.col('yyyy_mm_dd'))).over(Window.partitionBy('x_id', 'product').orderBy('yyyy_mm_dd', 'usage'))
).withColumn(
'date_reached_i',
F.min(F.when(F.col('status') == 'i', F.col('yyyy_mm_dd'))).over(Window.partitionBy('x_id', 'product').orderBy('yyyy_mm_dd', 'usage'))
).orderBy('x_id', 'product', 'yyyy_mm_dd', 'usage')
df2.show()
+----+-------+-----+----------+------+---------------+--------------+
|x_id|product|usage|yyyy_mm_dd|status|date_reached_fc|date_reached_i|
+----+-------+-----+----------+------+---------------+--------------+
| 10|prod_go| 15|2020-10-10| i| null| 2020-10-10|
| 10|prod_go| 20|2020-10-11| i| null| 2020-10-10|
| 10|prod_mb| 0|2020-10-10| n| null| null|
| 10|prod_mb| 3|2020-10-11| fc| 2020-10-11| null|
| 10|prod_rv| 7|2020-10-10| fc| 2020-10-10| null|
| 10|prod_rv| 11|2020-10-11| i| 2020-10-10| 2020-10-11|
| 15|prod_go| 0|2020-10-10| n| null| null|
| 15|prod_go| 0|2020-10-11| n| null| null|
| 15|prod_mb| 1|2020-10-10| fc| 2020-10-10| null|
| 15|prod_mb| 1|2020-10-11| fc| 2020-10-10| null|
| 15|prod_rv| 5|2020-10-10| fc| 2020-10-10| null|
| 15|prod_rv| 5|2020-10-11| fc| 2020-10-10| null|
+----+-------+-----+----------+------+---------------+--------------+https://stackoverflow.com/questions/65425247
复制相似问题