首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >气流DAG的功能?

气流DAG的功能?
EN

Stack Overflow用户
提问于 2018-10-29 21:11:44
回答 2查看 3.5K关注 0票数 6

我在$AIRFLOW_HOME/dags工作。我创建了以下文件:

代码语言:javascript
复制
- common
  |- __init__.py   # empty
  |- common.py     # common code
- foo_v1.py        # dag instanciation

common.py

代码语言:javascript
复制
default_args = ...

def create_dag(project, version):
  dag_id = project + '_' + version
  dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
  print('creating DAG ' + dag_id)

  t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

  t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

  t2.set_upstream(t1)

foo_v1.py

代码语言:javascript
复制
 from common.common import create_dag

 create_dag('foo', 'v1')

在用python测试脚本时,它看起来很好:

代码语言:javascript
复制
 $ python foo_v1.py
 [2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
 creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0

然后在本地启动and服务器和调度程序。我的问题是我没有看到任何带有id foo_v1的DAG。没有创建pyc文件。究竟做错了什么?为什么没有执行foo_v1.py中的代码?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-10-30 09:21:26

要通过气流找到DAG对象,create_dag()返回的DAG对象必须位于foo_v1.py模块的全局命名空间中。将DAG放置在全局命名空间中的一种方法是将其分配给模块级变量:

代码语言:javascript
复制
from common.common import create_dag

dag = create_dag('foo', 'v1')

另一种方法是使用globals()更新全局命名空间。

代码语言:javascript
复制
globals()['foo_v1'] = create_dag('foo', 'v1')

后者看起来可能有点过火,但对于动态创建多个DAG来说却很有用。例如,在For -循环中:

代码语言:javascript
复制
for i in range(10):
    globals()[f'foo_v{i}'] = create_dag('foo', f'v{i}')

注意:放置在*.py中的任何$AIRFLOW_HOME/dags文件(即使是在子目录中,比如您的例子中的common )都将被气流解析。如果您不想这样做,可以使用.airflowignore封装DAGs

票数 8
EN

Stack Overflow用户

发布于 2018-10-29 21:25:37

您需要将dag分配给模块中的导出变量。如果dag不在模块中,__dict__气流的DagBag处理器就不会把它捡起来。

查看这里的源代码:https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L428

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53053895

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档