首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我可以在airflow的日程安排中添加延迟吗?

我可以在airflow的日程安排中添加延迟吗?
EN

Stack Overflow用户
提问于 2018-12-01 04:43:55
回答 2查看 2.5K关注 0票数 2

我有一个我想每天运行的管道,但我希望执行日期滞后。也就是说,在X日,我希望执行日期为X-3。这样的事情有可能发生吗?

EN

回答 2

Stack Overflow用户

发布于 2018-12-04 23:09:20

看起来您正在使用execution_date作为管道逻辑中的变量。例如,处理比execution_date早3天的数据。因此,您可以从execution_date中减去延迟,并在您的管道逻辑中使用结果,而不是让execution_date延迟3天。Airflow提供了许多方法来实现此目的:

  1. Templates: {{ execution_date - macros.timedelta(days=3) }}.因此,例如,PythonOperator的bash_command参数可以是bash_command='echo Processing date: {{ execution_date - macros.timedelta(days=3) }} '
  2. The PythonOperator's python callable:,定义可调用的内容,如def func(execution_date, **kwargs): ...,并设置BashOperator的参数provide_context=Truefunc()execution_date参数将被设置为当前调用的执行日期(datetime对象)。因此,在func()中,您可以对任何具有context参数的传感器的poke()execute()方法执行processing_date = execution_date - timedelta(days=3).
  3. The Sensors' context parameter:操作,该参数是包含所有宏(包括execution_date )的字典。因此,在这些方法中,您可以执行processing_date = context['execution_date'] - timedelta(days=3).

强迫执行日期有一个延迟是不正确的。因为,根据气流的逻辑,当前运行的DAG的执行日期通常只有在它正在迎头赶上时才会有延迟。

票数 2
EN

Stack Overflow用户

发布于 2018-12-01 05:53:58

您可以使用TimeSensor来延迟DAG中任务的执行。我不认为您可以更改实际的execution_date,除非您可以将其行为描述为cron。

如果您希望将此延迟仅应用于计划的DAG运行的子集,则可以使用BranchPythonOperator首先检查execution_date是否是您希望延迟的日期之一。如果是,则取带有传感器的树枝。否则,在没有它的情况下继续前进。

或者,特别是如果您计划在多个DAG中实现此行为,您可以编写传感器的修改版本。它可能看起来像这样:

代码语言:javascript
复制
def poke(self, context):
    if should_delay(context['execution_date']):
        self.log.info('Checking if the time (%s) has come', self.target_time)
        return timezone.utcnow().time() > self.target_time
    else:
        self.log.info('Not one of those days, just run')
        return True

您可以在https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/sensors/time_sensor.py#L38-L40中引用现有时间传感器的代码。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53564657

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档