首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >两个数据之间的火花计数日期

两个数据之间的火花计数日期
EN

Stack Overflow用户
提问于 2021-09-18 13:39:07
回答 2查看 93关注 0票数 3

我有两个数据文件,每个都有一个日期列。ie:

代码语言:javascript
复制
+-----------+
|  DEADLINES|
+-----------+
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
+-----------+

+----------+
|   DT_DATE|
+----------+
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02|
+----------+

我需要计算DT_DATE在给定的引用日期和每个DEADLINES日期之间的日期。

例如:使用2021-03-31作为引用日期应该提供以下结果集。

代码语言:javascript
复制
+-----------+------------+
|  DEADLINES|    dt_count|
+-----------+------------+
| 2023-07-15|           7|
| 2018-08-10|           0|
| 2022-03-28|           7|
| 2021-06-22|           4|
| 2021-12-18|           7|
| 2021-10-11|           5|
| 2021-11-13|           7|
+-----------+------------+

我设法让它在每一行的截止日期、数据和数据之间进行迭代,但是对于一个更大的数据集,性能变得非常差。

有人有更好的解决方案吗?

编辑:这是我目前的解决方案:

代码语言:javascript
复制
def count_days(deadlines_df, dates_df, ref_date):
    for row in deadlines_df.collect():
        qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
        yield row.DEADLINES, qtt


new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"])
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-09-18 15:05:32

这两种数据可以用不同的权重组合在一起,窗口函数的范围从开始到当前行都可以使用(Scala):

代码语言:javascript
复制
val deadlines = Seq(
  ("2023-07-15"),
  ("2018-08-10"),
  ("2022-03-28"),
  ("2021-06-22"),
  ("2021-12-18"),
  ("2021-10-11"),
  ("2021-11-13")
).toDF("DEADLINES")

val dates = Seq(
  ("2021-04-02"),
  ("2021-04-21"),
  ("2021-05-01"),
  ("2021-06-03"),
  ("2021-09-07"),
  ("2021-10-12"),
  ("2021-11-02")
).toDF("DT_DATE")

val referenceDate = "2021-03-31"
val united = deadlines.withColumn("weight", lit(0))
  .unionAll(
    dates
      .where($"DT_DATE" >= referenceDate)
      .withColumn("weight", lit(1))
  )

val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)

val result = united
  .withColumn("dt_count", sum("weight").over(fromStartToCurrentRowWindow))
  .where($"weight" === lit(0))
  .drop("weight")

输出:

代码语言:javascript
复制
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2018-08-10|0       |
|2021-06-22|4       |
|2021-10-11|5       |
|2021-11-13|7       |
|2021-12-18|7       |
|2022-03-28|7       |
|2023-07-15|7       |
+----------+--------+

注意:计算将在一个分区中执行,Spark显示这样的警告:警告日志记录-没有为窗口操作定义的分区!将所有数据移动到单个分区,会导致严重的性能下降。

还有其他可能的解决方案,按范围连接两个数据文件,这将导致笛卡儿连接。

票数 2
EN

Stack Overflow用户

发布于 2021-09-18 15:58:32

如果你有少量的截止日期,你可以:

columns

  • finally
  • 按截止日期在dates_df数据中添加一列,值为DT_DATE介于ref_date和截止日期之间,否则
  • 则将每个截止日期与dates_df转换结果数据之和,以获得所需的数据格式<代码>H 211<代码>F 212

让我们一步一步看

在截止日期前添加一栏:

代码语言:javascript
复制
from pyspark.sql import functions as F

deadline_rows = deadlines_df.collect()

dates_with_deadlines = dates_df
for row in deadline_rows:
    dates_with_deadlines = dates_with_deadlines.withColumn(
        str(row.DEADLINES),
        F.when(
          dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
        .otherwise(
          F.lit(0)
        )
    )

然后,通过您的示例,可以得到以下dates_with_deadlines数据:

代码语言:javascript
复制
+----------+----------+----------+----------+----------+----------+----------+----------+
|DT_DATE   |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
+----------+----------+----------+----------+----------+----------+----------+----------+
|2021-04-02|1         |0         |1         |1         |1         |1         |1         |
|2021-04-21|1         |0         |1         |1         |1         |1         |1         |
|2021-05-01|1         |0         |1         |1         |1         |1         |1         |
|2021-06-03|1         |0         |1         |1         |1         |1         |1         |
|2021-09-07|1         |0         |1         |0         |1         |1         |1         |
|2021-10-12|1         |0         |1         |0         |1         |0         |1         |
|2021-11-02|1         |0         |1         |0         |1         |0         |1         |
+----------+----------+----------+----------+----------+----------+----------+----------+

和截止日期

代码语言:javascript
复制
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])

在这一步之后,您将得到以下aggregated_df数据:

代码语言:javascript
复制
+----------+----------+----------+----------+----------+----------+----------+
|2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
+----------+----------+----------+----------+----------+----------+----------+
|7         |0         |7         |4         |7         |5         |7         |
+----------+----------+----------+----------+----------+----------+----------+

转置数据

代码语言:javascript
复制
result_df = aggregated_df \
  .withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
  .drop(*[str(x.DEADLINES) for x in deadline_rows]) \
  .withColumn('data', F.explode('merged')) \
  .drop('merged') \
  .withColumn('DEADLINES', F.col('data.DEADLINES')) \
  .withColumn('dt_count', F.col('data.dt_count')) \
  .drop('data')

并且您有您期望的result_df数据:

代码语言:javascript
复制
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2023-07-15|7       |
|2018-08-10|0       |
|2022-03-28|7       |
|2021-06-22|4       |
|2021-12-18|7       |
|2021-10-11|5       |
|2021-11-13|7       |
+----------+--------+

完整代码

代码语言:javascript
复制
from pyspark.sql import functions as F

deadline_rows = deadlines_df.collect()

dates_with_deadlines = dates_df
for row in deadline_rows:
    dates_with_deadlines = dates_with_deadlines.withColumn(
        str(row.DEADLINES),
        F.when(
          dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
        .otherwise(
          F.lit(0)
        )
    )

aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])

result_df = aggregated_df \
  .withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
  .drop(*[str(x.DEADLINES) for x in deadline_rows]) \
  .withColumn('data', F.explode('merged')) \
  .drop('merged') \
  .withColumn('DEADLINES', F.col('data.DEADLINES')) \
  .withColumn('dt_count', F.col('data.dt_count')) \
  .drop('data')

这种解决方案的优点和局限性

使用此解决方案,唯一不能使用分布式系统执行的步骤是转置步骤。

此外,我们没有执行当前的解决方案,而是对Par等位基因中的每个截止日期列执行所有聚合,而不是按顺序执行。

但是,该解决方案只有在只有很少的截止日期(数百个,甚至数千个截止日期)的情况下才能工作,首先是因为我们在使用.collect()的火花驱动程序中检索所有这些截止日期,其次是因为在第一步中,我们每个截止日期创建一列,创建包含大量数据的行,最后是因为最后一步也只在一个执行器上执行。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69235181

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档