我有两个不同的DAG需要在不同的频率上运行。一种是dag1需要每周运行一次,dag2需要每天运行。现在,dag2应该只在dag1完成时运行,在每次运行dag1时都会运行。
我在两个不同的python模块中定义了两个DAG,如下所示。
dag1.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag1',
default_args={
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='00 10 * * 1',
catchup=True
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)dag2.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag2',
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='5 10 * * *',
catchup=True
) as dag:
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='''
cd {}/scraper && scrapy crawl crawl_dataset
'''.format(PROJECT_PATH)
)目前,我已经手动设置了5分钟间隔之间的两个达格。此设置目前无法工作,并且也缺乏使dag2根据需要依赖于dag1的功能。
注:schedule_intervals仅供参考。这样做的目的是每个星期一在一个固定的时间运行dag1,并在一个固定的时间每天运行dag2,而在周一,它应该只在dag1完成之后运行。在这里,每个守护进程也有多个任务。
发布于 2019-08-28 10:24:05
在对流程的理解做了很大的努力之后,我终于自己想出了答案(不知道它有多好,但目前对我来说是可行的)。感谢这答案和分支文档。下面是我使用BranchPythonOperator的解决方案。
dag1.py
import datetime as dt
from os import path
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 20),
'concurrency': 1,
'retries': 0
}
def branch_tasks(execution_date, **kwargs):
'''
Branch the tasks based on weekday.
'''
# check if the execution day is 'Saturday'
if execution_date.weekday() == 5:
return ['crawl_params', 'crawl_dataset']
return 'crawl_dataset'
with DAG('dag1',
default_args=DEFAULT_ARGS,
schedule_interval='00 10 * * *',
catchup=False
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
trigger_rule='none_failed'
)
BRANCH_OP = BranchPythonOperator(
task_id='branch_tasks',
provide_context=True,
python_callable=branch_tasks,
dag=dag
)
BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
CRAWL_PARAMS.set_downstream(CRAWL_DATASET)在这里,BranchPythonOperator使用branch_tasks函数根据每周的哪一天来选择要运行的任务。
这里的另一个问题是,当crawl_params运行时,它的条件为真,下行也会运行,但是当它被跳过时,它的下行流也会被跳过。为了避免这种情况,我们需要将trigger_rule='none_failed'传递给任务的操作符。这意味着,如果上游的任务没有一个失败(它们要么成功了,要么被跳过),任务就应该运行。
发布于 2019-08-26 15:46:23
ExternalTaskSensor开始第二个DAG,等待完成第一个DAG的最后一个任务。TriggerDagRunOperator在第一个进程结束时触发第二个守护进程。但是,在这种情况下,您将无法将一个schedule_interval分配给第二个进程(因为它将由第一个进程“强制”触发)。发布于 2019-08-27 09:20:28
您可以在同一个DAG中写入这两个任务,并有一个下游来设置任务依赖项。
task1.set_downstream(task2)对于不同的任务调度依赖项,使用每日计划创建DAG。对于具有每周计划的任务,编写一个shortCircuitOperator以启用每周触发器:
# Set trigger for first day of the week
def check_trigger_week(execution_date, **kwargs):
return execution_date.weekday() == 0
# Task should check for the trigger to see if its first day of the week
check_trigger_weekly = ShortCircuitOperator(
task_id='check_trigger_weekly',
python_callable=check_trigger_week,
provide_context=True,
dag=dag
)然后使您的每周任务依赖于此周触发器。
check_trigger_weekly.set_downstream(task)https://stackoverflow.com/questions/57659882
复制相似问题