我想知道是否有可能覆盖一份工作的cron计划。在我的例子中,我想每个月每6个工作日运行一份Dagster的工作。因此,我编写了一个Python函数,返回下一个月的下一个工作日,并用cron符号编写了这个函数。然后,在工作按照计划运行之后,我想将cron计划覆盖到下一个月的第六个工作日。
到目前为止,这是我的解决方案:
next_schedule = find_6th_business_day()
@schedule(cron_schedule=next_schedule, job=my_job, execution_timezone="Europe/Berlin")
def my_scheduler(context):
run_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
# update cron schedule
global next_schedule
next_schedule = find_6th_business_day()
return {"ops": {"op1": {"config": {"date": run_date}},
"op2": {"config": {"date": run_date}}}}我想,如果我将next_schedule变量定义为全局变量,这样就可以在装饰器中覆盖它,这会有所帮助。但我不确定这是否能解决我的问题。这里有人能帮忙吗?也许达格斯特对我的问题有一些内置的解决方案,我不知道。
发布于 2022-04-12 14:44:23
需要考虑的一种方法是每天运行您的计划,但是返回SkipReason而不是RunRequest,除非您希望在第6个工作日运行。
https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules
https://stackoverflow.com/questions/71843594
复制相似问题