首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花放电数据中的爆炸日期和回填行

火花放电数据中的爆炸日期和回填行
EN

Stack Overflow用户
提问于 2022-11-18 11:54:41
回答 1查看 40关注 0票数 1

我有这个数据:

代码语言:javascript
复制
+---+----------+------+
| id|      date|amount|
+---+----------+------+
|123|2022-11-11|100.00|
|123|2022-11-12|100.00|
|123|2022-11-13|100.00|
|123|2022-11-14|200.00|
|456|2022-11-14|300.00|
|456|2022-11-15|300.00|
|456|2022-11-16|300.00|
|789|2022-11-11|400.00|
|789|2022-11-12|500.00|
+---+----------+------+

我需要为每次约会创建新的记录,直到current_date() - 2。将被填充的值必须是最近的值。

例如,如果是date_sub(current_date(), 2) == "2022-11-16",那么我需要以下数据:

代码语言:javascript
复制
+------+----------+-------+
|id    |    date  | amount|
+------+----------+-------+
|   123|2022-11-11|100,00 |
|   123|2022-11-12|100,00 |
|   123|2022-11-13|100,00 |
|   123|2022-11-14|200,00 |
|   123|2022-11-15|200,00 |
|   123|2022-11-16|200,00 |
|   456|2022-11-14|300,00 |
|   456|2022-11-15|300,00 |
|   456|2022-11-16|300,00 |
|   789|2022-11-11|400,00 |
|   789|2022-11-12|500,00 |
|   789|2022-11-13|500,00 |
|   789|2022-11-14|500,00 |
|   789|2022-11-15|500,00 |
|   789|2022-11-16|500,00 |
+------+----------+-------+
代码语言:javascript
复制
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[4]").appName("Complete Rows").getOrCreate()

from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, IntegerType, DateType, DecimalType
from datetime import datetime
from decimal import Decimal

vdata = [
    (123,datetime.strptime('2022-11-11','%Y-%m-%d'),Decimal(100)),
    (123,datetime.strptime('2022-11-12','%Y-%m-%d'),Decimal(100)),
    (123,datetime.strptime('2022-11-13','%Y-%m-%d'),Decimal(100)),
    (123,datetime.strptime('2022-11-14','%Y-%m-%d'),Decimal(200)),
    (456,datetime.strptime('2022-11-14','%Y-%m-%d'),Decimal(300)),
    (456,datetime.strptime('2022-11-15','%Y-%m-%d'),Decimal(300)),
    (456,datetime.strptime('2022-11-16','%Y-%m-%d'),Decimal(300)),
    (789,datetime.strptime('2022-11-11','%Y-%m-%d'),Decimal(400)),
    (789,datetime.strptime('2022-11-12','%Y-%m-%d'),Decimal(500))]

schema = StructType([
    StructField("id",IntegerType(),False),
    StructField("date",DateType(),False),
    StructField("amount",DecimalType(10,2),False)])

df = spark.createDataFrame(vdata,schema)

df.show()

我试图确定每个ID的最大日期,然后确定该最大日期的最后一个值,然后执行一个F.expr(sequence)来创建记录列表,然后爆炸来创建这些行,但效果不太好。感谢您的任何帮助!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-18 13:46:46

我设法找到了以下解决方案。

为了澄清起见,我将其分为三个步骤;当然,如果使代码更加紧凑,您可以编写更少的代码行。

1)查阅

为每个id创建一个具有所有必要日期(包括当前日期和非日期日期)的查找表。

代码语言:javascript
复制
import pyspark.sql.functions as F
from pyspark.sql.window import Window

lookup = (df
          .groupby('id')
          .agg(
            F.min('date').alias('start_date'),
            F.date_sub(F.current_date(), 2).alias('end_date')
          )
          .select('id', F.explode(F.expr('sequence(start_date, end_date, interval 1 day)')).alias('date'))
         )
lookup.show()

+---+----------+
| id|      date|
+---+----------+
|123|2022-11-11|
|123|2022-11-12|
|123|2022-11-13|
|123|2022-11-14|
|123|2022-11-15|
|123|2022-11-16|
|456|2022-11-14|
|456|2022-11-15|
|456|2022-11-16|
|789|2022-11-11|
|789|2022-11-12|
|789|2022-11-13|
|789|2022-11-14|
|789|2022-11-15|
|789|2022-11-16|
+---+----------+

2)加入

之后,我们使用原始的dataframe加入查找表:通过这种方式添加必要的行,并将amount变量设置为null。

代码语言:javascript
复制
df = df.join(lookup, on=['id', 'date'], how='outer')
df.show()

+---+----------+------+
| id|      date|amount|
+---+----------+------+
|123|2022-11-11| 100.0|
|123|2022-11-12| 100.0|
|123|2022-11-13| 100.0|
|123|2022-11-14| 200.0|
|123|2022-11-15|  null|
|123|2022-11-16|  null|
|456|2022-11-14| 300.0|
|456|2022-11-15| 300.0|
|456|2022-11-16| 300.0|
|789|2022-11-11| 400.0|
|789|2022-11-12| 500.0|
|789|2022-11-13|  null|
|789|2022-11-14|  null|
|789|2022-11-15|  null|
|789|2022-11-16|  null|
+---+----------+------+

3) last函数

我们使用last函数和ignorenulls=True检索由id划分并按日期排序的窗口中的最后一个非空值。

代码语言:javascript
复制
w = Window.partitionBy('id').orderBy('date').rowsBetween(Window.unboundedPreceding, 0)

df = df.withColumn('amount', F.last('amount', ignorenulls=True).over(w))
df.show()

+---+----------+------+
| id|      date|amount|
+---+----------+------+
|123|2022-11-11| 100.0|
|123|2022-11-12| 100.0|
|123|2022-11-13| 100.0|
|123|2022-11-14| 200.0|
|123|2022-11-15| 200.0|
|123|2022-11-16| 200.0|
|456|2022-11-14| 300.0|
|456|2022-11-15| 300.0|
|456|2022-11-16| 300.0|
|789|2022-11-11| 400.0|
|789|2022-11-12| 500.0|
|789|2022-11-13| 500.0|
|789|2022-11-14| 500.0|
|789|2022-11-15| 500.0|
|789|2022-11-16| 500.0|
+---+----------+------+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74489283

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档