我们正在将一个巨大的代码库从Spark2迁移到Spark3.x。为了使迁移逐步进行,一些信任被设置为具有与Spark2.x中相同的行为。然而,函数add_months没有“遗留”模式。在Spark3中,根据迁移文档
在Spark3.0中,如果最初的日期是月份的最后一天,则add_months函数不会将结果日期调整为月的最后一天。例如,选择add_months(日期‘2019-02-28’,1)结果2019-03-28.在星火2.4及更低版本中,当原始日期是月份的最后一天时,结果日期将进行调整。例如,在20990-02-28年增加一个月的结果.
当Spark2.x将结果日期调整到这个月的最后一天时。显而易见的解决方案是为它编写一个包装器,但是我不知道Spark3中是否有任何配置来获得add_months Spark2行为。
编辑:
最后,我在ScalaSpark3.x中实现了add_months的包装器:
object functions {
def add_months(startDate: Column, numMonths: Int): Column = add_months(startDate, lit(numMonths))
def add_months(startDate: Column, numMonths: Column): Column = {
val addedMonthsSpark = add_months_spark(startDate, numMonths)
val startDateIsLastDay = last_day(startDate) === startDate
when(startDateIsLastDay, last_day(addedMonthsSpark)).otherwise(addedMonthsSpark)
}
}发布于 2022-01-15 13:39:00
下面是您提到的包装器的Python实现。
def add_months(start_date: str or Column, num_months: int):
if isinstance(start_date, str):
start_date = f.col(start_date)
add_months_spark = f.add_months(start_date, num_months)
start_date_is_last_day = f.last_day(start_date) == start_date
return f.when(
start_date_is_last_day,
f.last_day(add_months_spark)
).otherwise(add_months_spark)此外,还可以通过使用isinstance来避免使用singledispatch来重载start_date。
https://stackoverflow.com/questions/69583594
复制相似问题