我有两个数据文件,每个都有一个日期列。ie:
+-----------+
| DEADLINES|
+-----------+
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
+-----------+
+----------+
| DT_DATE|
+----------+
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02|
+----------+我需要计算DT_DATE在给定的引用日期和每个DEADLINES日期之间的日期。
例如:使用2021-03-31作为引用日期应该提供以下结果集。
+-----------+------------+
| DEADLINES| dt_count|
+-----------+------------+
| 2023-07-15| 7|
| 2018-08-10| 0|
| 2022-03-28| 7|
| 2021-06-22| 4|
| 2021-12-18| 7|
| 2021-10-11| 5|
| 2021-11-13| 7|
+-----------+------------+我设法让它在每一行的截止日期、数据和数据之间进行迭代,但是对于一个更大的数据集,性能变得非常差。
有人有更好的解决方案吗?
编辑:这是我目前的解决方案:
def count_days(deadlines_df, dates_df, ref_date):
for row in deadlines_df.collect():
qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
yield row.DEADLINES, qtt
new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"])发布于 2021-09-18 15:05:32
这两种数据可以用不同的权重组合在一起,窗口函数的范围从开始到当前行都可以使用(Scala):
val deadlines = Seq(
("2023-07-15"),
("2018-08-10"),
("2022-03-28"),
("2021-06-22"),
("2021-12-18"),
("2021-10-11"),
("2021-11-13")
).toDF("DEADLINES")
val dates = Seq(
("2021-04-02"),
("2021-04-21"),
("2021-05-01"),
("2021-06-03"),
("2021-09-07"),
("2021-10-12"),
("2021-11-02")
).toDF("DT_DATE")
val referenceDate = "2021-03-31"
val united = deadlines.withColumn("weight", lit(0))
.unionAll(
dates
.where($"DT_DATE" >= referenceDate)
.withColumn("weight", lit(1))
)
val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)
val result = united
.withColumn("dt_count", sum("weight").over(fromStartToCurrentRowWindow))
.where($"weight" === lit(0))
.drop("weight")输出:
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2018-08-10|0 |
|2021-06-22|4 |
|2021-10-11|5 |
|2021-11-13|7 |
|2021-12-18|7 |
|2022-03-28|7 |
|2023-07-15|7 |
+----------+--------+注意:计算将在一个分区中执行,Spark显示这样的警告:警告日志记录-没有为窗口操作定义的分区!将所有数据移动到单个分区,会导致严重的性能下降。
还有其他可能的解决方案,按范围连接两个数据文件,这将导致笛卡儿连接。
发布于 2021-09-18 15:58:32
如果你有少量的截止日期,你可以:
columns
dates_df数据中添加一列,值为DT_DATE介于ref_date和截止日期之间,否则dates_df转换结果数据之和,以获得所需的数据格式<代码>H 211<代码>F 212让我们一步一步看
在截止日期前添加一栏:
from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows:
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
.otherwise(
F.lit(0)
)
)然后,通过您的示例,可以得到以下dates_with_deadlines数据:
+----------+----------+----------+----------+----------+----------+----------+----------+
|DT_DATE |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
+----------+----------+----------+----------+----------+----------+----------+----------+
|2021-04-02|1 |0 |1 |1 |1 |1 |1 |
|2021-04-21|1 |0 |1 |1 |1 |1 |1 |
|2021-05-01|1 |0 |1 |1 |1 |1 |1 |
|2021-06-03|1 |0 |1 |1 |1 |1 |1 |
|2021-09-07|1 |0 |1 |0 |1 |1 |1 |
|2021-10-12|1 |0 |1 |0 |1 |0 |1 |
|2021-11-02|1 |0 |1 |0 |1 |0 |1 |
+----------+----------+----------+----------+----------+----------+----------+----------+和截止日期
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])在这一步之后,您将得到以下aggregated_df数据:
+----------+----------+----------+----------+----------+----------+----------+
|2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
+----------+----------+----------+----------+----------+----------+----------+
|7 |0 |7 |4 |7 |5 |7 |
+----------+----------+----------+----------+----------+----------+----------+转置数据
result_df = aggregated_df \
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
.drop(*[str(x.DEADLINES) for x in deadline_rows]) \
.withColumn('data', F.explode('merged')) \
.drop('merged') \
.withColumn('DEADLINES', F.col('data.DEADLINES')) \
.withColumn('dt_count', F.col('data.dt_count')) \
.drop('data')并且您有您期望的result_df数据:
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2023-07-15|7 |
|2018-08-10|0 |
|2022-03-28|7 |
|2021-06-22|4 |
|2021-12-18|7 |
|2021-10-11|5 |
|2021-11-13|7 |
+----------+--------+完整代码
from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows:
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
.otherwise(
F.lit(0)
)
)
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])
result_df = aggregated_df \
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
.drop(*[str(x.DEADLINES) for x in deadline_rows]) \
.withColumn('data', F.explode('merged')) \
.drop('merged') \
.withColumn('DEADLINES', F.col('data.DEADLINES')) \
.withColumn('dt_count', F.col('data.dt_count')) \
.drop('data')这种解决方案的优点和局限性
使用此解决方案,唯一不能使用分布式系统执行的步骤是转置步骤。
此外,我们没有执行当前的解决方案,而是对Par等位基因中的每个截止日期列执行所有聚合,而不是按顺序执行。
但是,该解决方案只有在只有很少的截止日期(数百个,甚至数千个截止日期)的情况下才能工作,首先是因为我们在使用.collect()的火花驱动程序中检索所有这些截止日期,其次是因为在第一步中,我们每个截止日期创建一列,创建包含大量数据的行,最后是因为最后一步也只在一个执行器上执行。
https://stackoverflow.com/questions/69235181
复制相似问题