我正在修改描述为here的函数,以使用pyspark。
输入
from pyspark.sql import functions as F
data_in = spark.createDataFrame([
[1, "2017-1-1", "2017-6-30"], [1, "2017-1-1", "2017-1-3"], [1, "2017-5-1", "2017-9-30"],
[1, "2018-5-1", "2018-9-30"], [1, "2018-5-2", "2018-10-31"], [1, "2017-4-1", "2017-5-30"],
[1, "2017-10-3", "2017-10-3"], [1, "2016-12-5", "2016-12-31"], [1, "2016-12-1", "2016-12-2"],
[2, "2016-12-1", "2016-12-2"], [2, "2016-12-3", "2016-12-25"]
], schema=["id","start_dt","end_dt"])
data_in = data_in.select("id", F.to_date("start_dt","yyyy-M-d").alias("start_dt"),
F.to_date("end_dt","yyyy-M-d").alias("end_dt")).sort(["id","start_dt","end_dt"])应用聚合函数
from datetime import datetime
mydt = datetime(1970,1,1).date()
def merge_dates(grp):
dt_groups = ((grp["start_dt"]-grp["end_dt"].shift(fill_value=mydt)).dt.days > 1).cumsum()
grouped = grp.groupby(dt_groups).agg({"start_dt":"min", "end_dt":"max"})
return grouped if len(grp)==len(grouped) else merge_dates(grouped)使用Pandas进行测试
df = data_in.toPandas()
df.groupby("id").apply(merge_dates).reset_index().drop('level_1', axis=1)输出
id start_dt end_dt
0 1 2016-12-01 2016-12-02
1 1 2016-12-05 2017-09-30
2 1 2017-10-03 2017-10-03
3 1 2018-05-01 2018-10-31
4 2 2016-12-01 2016-12-25当我尝试用星火运行它时
data_out = data_in.groupby("id").applyInPandas(merge_dates, schema=data_in.schema)
display(data_out)我得到以下错误
PythonException: 'RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: 3 Actual: 2'. Full traceback below:当我将模式更改为data_in.schema1时:我只返回正确计算的日期列(与Pandas输出匹配),但不返回字段id --这显然是必需的。我如何解决这个问题,以便最终的输出也具有id?
发布于 2021-10-05 19:54:13
只要有火花,如果我们复制你在熊猫身上拥有的东西,它会看起来如下所示:
from pyspark.sql import functions as F
w = W.partitionBy("id").orderBy(F.monotonically_increasing_id())
w1 = w.rangeBetween(W.unboundedPreceding,0)
out = (data_in.withColumn("helper",F.datediff(F.col("start_dt"),
F.lag("end_dt").over(w))>1)
.fillna({"helper":True})
.withColumn("helper2",F.sum(F.col("helper").cast("int")).over(w1))
.groupBy("id","helper2").agg(F.min("start_dt").alias("start_dt"),
F.max("end_dt").alias("end_dt")
)
.drop("helper2"))out.show()
+---+----------+----------+
| id| start_dt| end_dt|
+---+----------+----------+
| 1|2016-12-01|2016-12-02|
| 1|2016-12-05|2017-09-30|
| 1|2017-10-03|2017-10-03|
| 1|2018-05-01|2018-10-31|
| 2|2016-12-01|2016-12-25|
+---+----------+----------+请注意,这假设mydt = datetime(1970,1,1).date()只是值转换时的空值占位符,.i使用了.i作为True表示相同的值。如果不是,您可以在lag之后填入,这与shift相同
https://stackoverflow.com/questions/69455759
复制相似问题