我有一个数据,如下所示,
+-----+----------+---------+-------+-------------------+
|jobid|fieldmname|new_value|coltype| createat|
+-----+----------+---------+-------+-------------------+
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34|
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34|
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34|
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34|
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34|
+-----+----------+---------+-------+-------------------+
Seq(
(1, "jobstage", "sttaus1", "null", "2022-10-10 12:11:34"),
(1, "jobstatus", "sttaus2", "status", "2022-10-10 13:11:34"),
(1, "jobstage", "sttaus3", "null", "2022-10-10 14:11:34"),
(1, "jobstatus", "sttaus4", "null", "2022-10-10 15:11:34"),
(1, "jobstatus", "sttaus10", "status", "2022-10-10 16:11:34"),
(1, "jobstatus", "sttaus11", null, "2022-10-10 17:11:34"),
(2, "jobstage", "sttaus1", "null", "2022-10-11 10:11:34"),
(2, "jobstatus", "sttaus2", "status", "2022-11-10 12:11:34")
).toDF("jobid", "fieldmname", "new_value", "coltype", "createat")需要添加新列并仅为字段名称为“作业阶段”的行添加值。值应该是对应的作业阶段的最新状态(下一行签入)。在选择最新的需要时,如果是“状态”,就需要检查coltype值。
预期数据:
+-----+----------+---------+-------+-------------------+-------------+
|jobid|fieldmname|new_value|coltype| createat|latest_status|
+-----+----------+---------+-------+-------------------+-------------+
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34| sttaus10|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34| |
+-----+----------+---------+-------+-------------------+-------------+我尝试了铅,滞后,row_number,但没有得到预期的结果。
发布于 2022-06-16 11:46:04
问题是标记为火花放电,因此我正在编写一种方法,使用first()窗口函数来完成火花放电中所需的操作。
data_sdf. \
withColumn('latest',
func.when(func.col('fieldmname') == 'jobstage',
func.first(func.when((func.col('coltype') == 'status') & (func.col('fieldmname') == 'jobstatus'), func.col('new_value')), ignorenulls=True).
over(wd.partitionBy('jobid').orderBy('createat').rowsBetween(0, sys.maxsize))
).
otherwise(func.lit(''))
). \
show()
# +-----+----------+---------+-------+-------------------+--------+
# |jobid|fieldmname|new_value|coltype| createat| latest|
# +-----+----------+---------+-------+-------------------+--------+
# | 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
# | 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
# | 1| jobstage| sttaus3| null|2022-10-10 14:11:34|sttaus10|
# | 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
# | 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
# | 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
# | 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
# | 2| jobstatus| sttaus2| status|2022-11-10 12:11:34| |
# +-----+----------+---------+-------+-------------------+--------+因此,它将考虑对应记录中的第一条记录,其中fieldmname为“作业状态”,coltype为“状态”。
https://stackoverflow.com/questions/72643346
复制相似问题