首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >化工厂熊猫UDF合并日期范围为多行

化工厂熊猫UDF合并日期范围为多行
EN

Stack Overflow用户
提问于 2021-10-05 18:47:23
回答 1查看 226关注 0票数 1

我正在修改描述为here的函数,以使用pyspark。

输入

代码语言:javascript
复制
from pyspark.sql import functions as F

data_in = spark.createDataFrame([
    [1, "2017-1-1", "2017-6-30"], [1, "2017-1-1", "2017-1-3"], [1, "2017-5-1", "2017-9-30"],
    [1, "2018-5-1", "2018-9-30"], [1, "2018-5-2", "2018-10-31"], [1, "2017-4-1", "2017-5-30"],
    [1, "2017-10-3", "2017-10-3"], [1, "2016-12-5", "2016-12-31"], [1, "2016-12-1", "2016-12-2"],
    [2, "2016-12-1", "2016-12-2"], [2, "2016-12-3", "2016-12-25"]
  ], schema=["id","start_dt","end_dt"])

data_in = data_in.select("id", F.to_date("start_dt","yyyy-M-d").alias("start_dt"), 
               F.to_date("end_dt","yyyy-M-d").alias("end_dt")).sort(["id","start_dt","end_dt"])

应用聚合函数

代码语言:javascript
复制
from datetime import datetime

mydt = datetime(1970,1,1).date()
def merge_dates(grp):
  dt_groups = ((grp["start_dt"]-grp["end_dt"].shift(fill_value=mydt)).dt.days > 1).cumsum()
  grouped = grp.groupby(dt_groups).agg({"start_dt":"min", "end_dt":"max"})
  return grouped if len(grp)==len(grouped) else merge_dates(grouped)

使用Pandas进行测试

代码语言:javascript
复制
df = data_in.toPandas()
df.groupby("id").apply(merge_dates).reset_index().drop('level_1', axis=1)

输出

代码语言:javascript
复制
   id    start_dt      end_dt
0   1  2016-12-01  2016-12-02
1   1  2016-12-05  2017-09-30
2   1  2017-10-03  2017-10-03
3   1  2018-05-01  2018-10-31
4   2  2016-12-01  2016-12-25

当我尝试用星火运行它时

代码语言:javascript
复制
data_out = data_in.groupby("id").applyInPandas(merge_dates, schema=data_in.schema)
display(data_out)

我得到以下错误

代码语言:javascript
复制
PythonException: 'RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: 3 Actual: 2'. Full traceback below:

当我将模式更改为data_in.schema1时:我只返回正确计算的日期列(与Pandas输出匹配),但不返回字段id --这显然是必需的。我如何解决这个问题,以便最终的输出也具有id

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-05 19:54:13

只要有火花,如果我们复制你在熊猫身上拥有的东西,它会看起来如下所示:

代码语言:javascript
复制
from pyspark.sql import functions as F
w = W.partitionBy("id").orderBy(F.monotonically_increasing_id())
w1 = w.rangeBetween(W.unboundedPreceding,0)

out = (data_in.withColumn("helper",F.datediff(F.col("start_dt"),
                                    F.lag("end_dt").over(w))>1)
     .fillna({"helper":True})
     .withColumn("helper2",F.sum(F.col("helper").cast("int")).over(w1))
     .groupBy("id","helper2").agg(F.min("start_dt").alias("start_dt"),
                    F.max("end_dt").alias("end_dt")
                    )
.drop("helper2"))

代码语言:javascript
复制
out.show()

+---+----------+----------+
| id|  start_dt|    end_dt|
+---+----------+----------+
|  1|2016-12-01|2016-12-02|
|  1|2016-12-05|2017-09-30|
|  1|2017-10-03|2017-10-03|
|  1|2018-05-01|2018-10-31|
|  2|2016-12-01|2016-12-25|
+---+----------+----------+

请注意,这假设mydt = datetime(1970,1,1).date()只是值转换时的空值占位符,.i使用了.i作为True表示相同的值。如果不是,您可以在lag之后填入,这与shift相同

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69455759

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档