我用气流编排了一些python脚本。我有一个运行几个子标记的“主”dag。我的主进程应该根据以下概述运行:

通过使用以下行,我成功地在我的主dag中实现了这个结构:
etl_internal_sub_dag1 >> etl_internal_sub_dag2 >> etl_internal_sub_dag3
etl_internal_sub_dag3 >> etl_adzuna_sub_dag
etl_internal_sub_dag3 >> etl_adwords_sub_dag
etl_internal_sub_dag3 >> etl_facebook_sub_dag
etl_internal_sub_dag3 >> etl_pagespeed_sub_dag
etl_adzuna_sub_dag >> etl_combine_sub_dag
etl_adwords_sub_dag >> etl_combine_sub_dag
etl_facebook_sub_dag >> etl_combine_sub_dag
etl_pagespeed_sub_dag >> etl_combine_sub_dag我想要做的是首先运行etl_internal_sub_dag1,然后是etl_internal_sub_dag2,然后是etl_internal_sub_dag3。当etl_internal_sub_dag3完成后,我希望etl_adzuna_sub_dag、etl_adwords_sub_dag、etl_facebook_sub_dag和etl_pagespeed_sub_dag并行运行。最后,当最后四个脚本完成后,我希望运行etl_combine_sub_dag。
但是,当我运行主进程时,etl_adzuna_sub_dag、etl_adwords_sub_dag、etl_facebook_sub_dag和etl_pagespeed_sub_dag是一个接一个地运行的,而不是并行运行的。
问题:如何确保脚本etl_adzuna_sub_dag、etl_adwords_sub_dag、etl_facebook_sub_dag和etl_pagespeed_sub_dag并行运行?
编辑: My default_args和DAG如下所示:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'end_date': end_date,
'email': ['myname@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
DAG_NAME = 'main_dag'
dag = DAG(DAG_NAME, default_args=default_args, catchup = False)发布于 2018-10-10 13:41:26
您将需要使用LocalExecutor。
检查您的信任(airflow.cfg),您可能正在使用SequentialExectuor,它依次执行任务。
气流使用后端数据库存储元数据。检查您的airflow.cfg文件并查找executor关键字。默认情况下,气流使用SequentialExecutor,无论发生什么情况,它都会顺序执行任务。因此,为了允许气流并行运行任务,您需要在Postges或MySQL中创建数据库,并在airflow.cfg (sql_alchemy_conn param)中配置数据库,然后将执行器更改为airflow.cfg中的LocalExecutor,然后运行airflow initdb。
注意,要使用LocalExecutor,您需要使用Postgres或MySQL而不是SQLite作为后端数据库。
更多信息:https://airflow.incubator.apache.org/howto/initialize-database.html
如果您想要对气流进行真正的测试,您应该考虑建立一个真正的数据库后端并切换到LocalExecutor。由于气流是为了使用伟大的SqlAlchemy库与其元数据交互而构建的,因此您应该能够使用任何作为SqlAlchemy后端支持的数据库后端。我们建议使用MySQL或Postgres。
发布于 2022-04-06 07:40:42
并行运行任务的一个简单解决方案是将它们放在括号中。例如:task_start >> [task_get_users, task_get_posts, task_get_comments, task_get_todos]
有关更多信息,您可以阅读此引用文献数据科学
https://stackoverflow.com/questions/52741536
复制相似问题