我有一个我想每天运行的管道,但我希望执行日期滞后。也就是说,在X日,我希望执行日期为X-3。这样的事情有可能发生吗?
发布于 2018-12-04 23:09:20
看起来您正在使用execution_date作为管道逻辑中的变量。例如,处理比execution_date早3天的数据。因此,您可以从execution_date中减去延迟,并在您的管道逻辑中使用结果,而不是让execution_date延迟3天。Airflow提供了许多方法来实现此目的:
{{ execution_date - macros.timedelta(days=3) }}.因此,例如,PythonOperator的bash_command参数可以是bash_command='echo Processing date: {{ execution_date - macros.timedelta(days=3) }} 'def func(execution_date, **kwargs): ...,并设置BashOperator的参数provide_context=True。func()的execution_date参数将被设置为当前调用的执行日期(datetime对象)。因此,在func()中,您可以对任何具有context参数的传感器的poke()和execute()方法执行processing_date = execution_date - timedelta(days=3).context parameter:操作,该参数是包含所有宏(包括execution_date )的字典。因此,在这些方法中,您可以执行processing_date = context['execution_date'] - timedelta(days=3).强迫执行日期有一个延迟是不正确的。因为,根据气流的逻辑,当前运行的DAG的执行日期通常只有在它正在迎头赶上时才会有延迟。
发布于 2018-12-01 05:53:58
您可以使用TimeSensor来延迟DAG中任务的执行。我不认为您可以更改实际的execution_date,除非您可以将其行为描述为cron。
如果您希望将此延迟仅应用于计划的DAG运行的子集,则可以使用BranchPythonOperator首先检查execution_date是否是您希望延迟的日期之一。如果是,则取带有传感器的树枝。否则,在没有它的情况下继续前进。
或者,特别是如果您计划在多个DAG中实现此行为,您可以编写传感器的修改版本。它可能看起来像这样:
def poke(self, context):
if should_delay(context['execution_date']):
self.log.info('Checking if the time (%s) has come', self.target_time)
return timezone.utcnow().time() > self.target_time
else:
self.log.info('Not one of those days, just run')
return True您可以在https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/sensors/time_sensor.py#L38-L40中引用现有时间传感器的代码。
https://stackoverflow.com/questions/53564657
复制相似问题