首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何从每日指标计算一系列状态?

如何从每日指标计算一系列状态?
EN

Stack Overflow用户
提问于 2021-08-13 22:06:38
回答 1查看 69关注 0票数 3

我有一个如下格式的df:

代码语言:javascript
复制
| name | status    | date  |
____________________________
| ben  | active    | 01/01 |
| ben  | active    | 01/02 |
| ben  | active    | 01/03 |
| ben  | in-active | 01/04 |
| ben  | in-active | 01/05 |
| ben  | active    | 01/06 |
| ben  | active    | 01/07 |

我需要创建一个如下格式的df:

代码语言:javascript
复制
| name | status    | start_date | end_date |
____________________________________________
| ben  | active    |   01/01    |   01/03  |
| ben  | in-active |   01/04    |   01/05  |
| ben  | active    |   01/06    |   01/07  |

我很难找到做这件事的最佳方法。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-13 22:06:38

在确定状态范围的最大值时有一些技巧,但是这段代码应该能做你想要的事情。

代码语言:javascript
复制
from pyspark.sql import types as T, functions as F, SparkSession, Window
import datetime
spark = SparkSession.builder.getOrCreate()

schema = T.StructType([
  T.StructField("name", T.StringType(), False),
  T.StructField("status", T.StringType(), False),
  T.StructField("date", T.DateType(), False),
])
data = [
  {"name": "ben", "status": "active", "date": datetime.date(day=1, month=1, year=2021)},
  {"name": "ben", "status": "active", "date": datetime.date(day=2, month=1, year=2021)},
  {"name": "ben", "status": "inactive", "date": datetime.date(day=3, month=1, year=2021)},
  {"name": "ben", "status": "inactive", "date": datetime.date(day=4, month=1, year=2021)},
  {"name": "ben", "status": "active", "date": datetime.date(day=5, month=1, year=2021)},
  {"name": "ben", "status": "active", "date": datetime.date(day=6, month=1, year=2021)},
  {"name": "ben", "status": "active", "date": datetime.date(day=7, month=1, year=2021)},
]

df = spark.createDataFrame(data, schema)

df.show()

"""
+----+--------+----------+
|name|  status|      date|
+----+--------+----------+
| ben|  active|2021-01-01|
| ben|  active|2021-01-02|
| ben|inactive|2021-01-03|
| ben|inactive|2021-01-04|
| ben|  active|2021-01-05|
| ben|  active|2021-01-06|
| ben|  active|2021-01-07|
+----+--------+----------+
"""

date_window = Window().partitionBy("name").orderBy("date")
df = df.select(
  "*",
  F.lag("status").over(date_window).alias("previous_status"),
  F.lead("date").over(date_window).alias("next_date")
)

boundaries = df.filter(
  (F.col("status") != F.col("previous_status")) | (F.col("previous_status").isNull()) | (F.col("next_date").isNull())
)

boundaries.show()

"""
+----+--------+----------+---------------+----------+
|name|  status|      date|previous_status| next_date|
+----+--------+----------+---------------+----------+
| ben|  active|2021-01-01|           null|2021-01-02|
| ben|inactive|2021-01-03|         active|2021-01-04|
| ben|  active|2021-01-05|       inactive|2021-01-06|
| ben|  active|2021-01-07|         active|      null|
+----+--------+----------+---------------+----------+
"""

computed_ends = boundaries.select(
  "*",
  F.lead("date").over(date_window).alias("maybe_end_date"),
)

computed_ends.show()

"""
+----+--------+----------+---------------+----------+--------------+
|name|  status|      date|previous_status| next_date|maybe_end_date|
+----+--------+----------+---------------+----------+--------------+
| ben|  active|2021-01-01|           null|2021-01-02|    2021-01-03|
| ben|inactive|2021-01-03|         active|2021-01-04|    2021-01-05|
| ben|  active|2021-01-05|       inactive|2021-01-06|    2021-01-07|
| ben|  active|2021-01-07|         active|      null|          null|
+----+--------+----------+---------------+----------+--------------+
"""

unbounded_end = computed_ends.select(
  "*",
  F.lead("maybe_end_date").over(date_window).alias("next_end_date")
)

unbounded_end.show()

"""
+----+--------+----------+---------------+----------+--------------+-------------+
|name|  status|      date|previous_status| next_date|maybe_end_date|next_end_date|
+----+--------+----------+---------------+----------+--------------+-------------+
| ben|  active|2021-01-01|           null|2021-01-02|    2021-01-03|   2021-01-05|
| ben|inactive|2021-01-03|         active|2021-01-04|    2021-01-05|   2021-01-07|
| ben|  active|2021-01-05|       inactive|2021-01-06|    2021-01-07|         null|
| ben|  active|2021-01-07|         active|      null|          null|         null|
+----+--------+----------+---------------+----------+--------------+-------------+
"""

corrected_end = unbounded_end.select(
  F.col("name"),
  F.col("status"),
  F.col("date").alias("start_date"),
  F.when(
    F.col("next_end_date").isNull(),
    F.col("maybe_end_date")
  ).otherwise(
    F.date_sub(
      F.col("maybe_end_date"),
      1
    )
  ).alias("end_date")
).filter(
  F.col("end_date").isNotNull()
)

corrected_end.show()

"""
+----+--------+----------+----------+
|name|  status|start_date|  end_date|
+----+--------+----------+----------+
| ben|  active|2021-01-01|2021-01-02|
| ben|inactive|2021-01-03|2021-01-04|
| ben|  active|2021-01-05|2021-01-07|
+----+--------+----------+----------+
"""
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68778798

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档