首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何安排两个DAG在两个不同的schedule_intervals中运行,但第二个只在第一个完成后运行

如何安排两个DAG在两个不同的schedule_intervals中运行,但第二个只在第一个完成后运行
EN

Stack Overflow用户
提问于 2019-08-26 14:28:19
回答 3查看 1.2K关注 0票数 0

我有两个不同的DAG需要在不同的频率上运行。一种是dag1需要每周运行一次,dag2需要每天运行。现在,dag2应该只在dag1完成时运行,在每次运行dag1时都会运行。

我在两个不同的python模块中定义了两个DAG,如下所示。

dag1.py

代码语言:javascript
复制
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag1',
     default_args={
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='00 10 * * 1',
     catchup=True
    ) as dag:

CRAWL_PARAMS = BashOperator(
    task_id='crawl_params',
    bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)

dag2.py

代码语言:javascript
复制
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag2',
     default_args = {
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='5 10 * * *',
     catchup=True
    ) as dag:

CRAWL_DATASET = BashOperator(
    task_id='crawl_dataset',
    bash_command='''
        cd {}/scraper && scrapy crawl crawl_dataset
    '''.format(PROJECT_PATH)
)

目前,我已经手动设置了5分钟间隔之间的两个达格。此设置目前无法工作,并且也缺乏使dag2根据需要依赖于dag1的功能。

我已经核对了这里这里的答案,但没能弄清楚。

注:schedule_intervals仅供参考。这样做的目的是每个星期一在一个固定的时间运行dag1,并在一个固定的时间每天运行dag2,而在周一,它应该只在dag1完成之后运行。在这里,每个守护进程也有多个任务。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2019-08-28 10:24:05

在对流程的理解做了很大的努力之后,我终于自己想出了答案(不知道它有多好,但目前对我来说是可行的)。感谢答案和分支文档。下面是我使用BranchPythonOperator的解决方案。

dag1.py

代码语言:javascript
复制
import datetime as dt
from os import path

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

DEFAULT_ARGS = {
    'owner': 'airflow',
    'start_date': dt.datetime(2019, 8, 20),
    'concurrency': 1,
    'retries': 0
}

def branch_tasks(execution_date, **kwargs):
    '''
    Branch the tasks based on weekday.
    '''
    # check if the execution day is 'Saturday'
    if execution_date.weekday() == 5:
        return ['crawl_params', 'crawl_dataset']

    return 'crawl_dataset'

with DAG('dag1',
         default_args=DEFAULT_ARGS,
         schedule_interval='00 10 * * *',
         catchup=False
        ) as dag:

    CRAWL_PARAMS = BashOperator(
        task_id='crawl_params',
        bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
    )

    CRAWL_DATASET = BashOperator(
        task_id='crawl_dataset',
        bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
        trigger_rule='none_failed'
    )

BRANCH_OP = BranchPythonOperator(
    task_id='branch_tasks',
    provide_context=True,
    python_callable=branch_tasks,
    dag=dag
)

BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
CRAWL_PARAMS.set_downstream(CRAWL_DATASET)

在这里,BranchPythonOperator使用branch_tasks函数根据每周的哪一天来选择要运行的任务。

这里的另一个问题是,当crawl_params运行时,它的条件为真,下行也会运行,但是当它被跳过时,它的下行流也会被跳过。为了避免这种情况,我们需要将trigger_rule='none_failed'传递给任务的操作符。这意味着,如果上游的任务没有一个失败(它们要么成功了,要么被跳过),任务就应该运行。

票数 0
EN

Stack Overflow用户

发布于 2019-08-26 15:46:23

  1. 最简单的解决方案是使用ExternalTaskSensor开始第二个DAG,等待完成第一个DAG的最后一个任务。
  2. 或者,您也可以使用TriggerDagRunOperator在第一个进程结束时触发第二个守护进程。但是,在这种情况下,您将无法将一个schedule_interval分配给第二个进程(因为它将由第一个进程“强制”触发)。
票数 0
EN

Stack Overflow用户

发布于 2019-08-27 09:20:28

您可以在同一个DAG中写入这两个任务,并有一个下游来设置任务依赖项。

代码语言:javascript
复制
task1.set_downstream(task2)

对于不同的任务调度依赖项,使用每日计划创建DAG。对于具有每周计划的任务,编写一个shortCircuitOperator以启用每周触发器:

代码语言:javascript
复制
# Set trigger for first day of the week
def check_trigger_week(execution_date, **kwargs):
    return execution_date.weekday() == 0

# Task should check for the trigger to see if its first day of the week
check_trigger_weekly = ShortCircuitOperator(
  task_id='check_trigger_weekly',
  python_callable=check_trigger_week,
  provide_context=True,
  dag=dag
)

然后使您的每周任务依赖于此周触发器。

代码语言:javascript
复制
check_trigger_weekly.set_downstream(task)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57659882

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档