我有这个数据:
+---+----------+------+
| id| date|amount|
+---+----------+------+
|123|2022-11-11|100.00|
|123|2022-11-12|100.00|
|123|2022-11-13|100.00|
|123|2022-11-14|200.00|
|456|2022-11-14|300.00|
|456|2022-11-15|300.00|
|456|2022-11-16|300.00|
|789|2022-11-11|400.00|
|789|2022-11-12|500.00|
+---+----------+------+我需要为每次约会创建新的记录,直到current_date() - 2。将被填充的值必须是最近的值。
例如,如果是date_sub(current_date(), 2) == "2022-11-16",那么我需要以下数据:
+------+----------+-------+
|id | date | amount|
+------+----------+-------+
| 123|2022-11-11|100,00 |
| 123|2022-11-12|100,00 |
| 123|2022-11-13|100,00 |
| 123|2022-11-14|200,00 |
| 123|2022-11-15|200,00 |
| 123|2022-11-16|200,00 |
| 456|2022-11-14|300,00 |
| 456|2022-11-15|300,00 |
| 456|2022-11-16|300,00 |
| 789|2022-11-11|400,00 |
| 789|2022-11-12|500,00 |
| 789|2022-11-13|500,00 |
| 789|2022-11-14|500,00 |
| 789|2022-11-15|500,00 |
| 789|2022-11-16|500,00 |
+------+----------+-------+import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("Complete Rows").getOrCreate()
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, IntegerType, DateType, DecimalType
from datetime import datetime
from decimal import Decimal
vdata = [
(123,datetime.strptime('2022-11-11','%Y-%m-%d'),Decimal(100)),
(123,datetime.strptime('2022-11-12','%Y-%m-%d'),Decimal(100)),
(123,datetime.strptime('2022-11-13','%Y-%m-%d'),Decimal(100)),
(123,datetime.strptime('2022-11-14','%Y-%m-%d'),Decimal(200)),
(456,datetime.strptime('2022-11-14','%Y-%m-%d'),Decimal(300)),
(456,datetime.strptime('2022-11-15','%Y-%m-%d'),Decimal(300)),
(456,datetime.strptime('2022-11-16','%Y-%m-%d'),Decimal(300)),
(789,datetime.strptime('2022-11-11','%Y-%m-%d'),Decimal(400)),
(789,datetime.strptime('2022-11-12','%Y-%m-%d'),Decimal(500))]
schema = StructType([
StructField("id",IntegerType(),False),
StructField("date",DateType(),False),
StructField("amount",DecimalType(10,2),False)])
df = spark.createDataFrame(vdata,schema)
df.show()我试图确定每个ID的最大日期,然后确定该最大日期的最后一个值,然后执行一个F.expr(sequence)来创建记录列表,然后爆炸来创建这些行,但效果不太好。感谢您的任何帮助!
发布于 2022-11-18 13:46:46
我设法找到了以下解决方案。
为了澄清起见,我将其分为三个步骤;当然,如果使代码更加紧凑,您可以编写更少的代码行。
1)查阅
为每个id创建一个具有所有必要日期(包括当前日期和非日期日期)的查找表。
import pyspark.sql.functions as F
from pyspark.sql.window import Window
lookup = (df
.groupby('id')
.agg(
F.min('date').alias('start_date'),
F.date_sub(F.current_date(), 2).alias('end_date')
)
.select('id', F.explode(F.expr('sequence(start_date, end_date, interval 1 day)')).alias('date'))
)
lookup.show()
+---+----------+
| id| date|
+---+----------+
|123|2022-11-11|
|123|2022-11-12|
|123|2022-11-13|
|123|2022-11-14|
|123|2022-11-15|
|123|2022-11-16|
|456|2022-11-14|
|456|2022-11-15|
|456|2022-11-16|
|789|2022-11-11|
|789|2022-11-12|
|789|2022-11-13|
|789|2022-11-14|
|789|2022-11-15|
|789|2022-11-16|
+---+----------+2)加入
之后,我们使用原始的dataframe加入查找表:通过这种方式添加必要的行,并将amount变量设置为null。
df = df.join(lookup, on=['id', 'date'], how='outer')
df.show()
+---+----------+------+
| id| date|amount|
+---+----------+------+
|123|2022-11-11| 100.0|
|123|2022-11-12| 100.0|
|123|2022-11-13| 100.0|
|123|2022-11-14| 200.0|
|123|2022-11-15| null|
|123|2022-11-16| null|
|456|2022-11-14| 300.0|
|456|2022-11-15| 300.0|
|456|2022-11-16| 300.0|
|789|2022-11-11| 400.0|
|789|2022-11-12| 500.0|
|789|2022-11-13| null|
|789|2022-11-14| null|
|789|2022-11-15| null|
|789|2022-11-16| null|
+---+----------+------+3) last函数
我们使用last函数和ignorenulls=True检索由id划分并按日期排序的窗口中的最后一个非空值。
w = Window.partitionBy('id').orderBy('date').rowsBetween(Window.unboundedPreceding, 0)
df = df.withColumn('amount', F.last('amount', ignorenulls=True).over(w))
df.show()
+---+----------+------+
| id| date|amount|
+---+----------+------+
|123|2022-11-11| 100.0|
|123|2022-11-12| 100.0|
|123|2022-11-13| 100.0|
|123|2022-11-14| 200.0|
|123|2022-11-15| 200.0|
|123|2022-11-16| 200.0|
|456|2022-11-14| 300.0|
|456|2022-11-15| 300.0|
|456|2022-11-16| 300.0|
|789|2022-11-11| 400.0|
|789|2022-11-12| 500.0|
|789|2022-11-13| 500.0|
|789|2022-11-14| 500.0|
|789|2022-11-15| 500.0|
|789|2022-11-16| 500.0|
+---+----------+------+https://stackoverflow.com/questions/74489283
复制相似问题