首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >希望创建当前任务下游的气流任务。

希望创建当前任务下游的气流任务。
EN

Stack Overflow用户
提问于 2018-01-10 23:26:11
回答 1查看 1K关注 0票数 1

我对气流的认识大多是全新的。

我有两个步骤:

  1. 获取所有符合条件的文件
  2. 解压缩文件

文件被压缩了一半,解压缩时是2-3千兆。我可以很容易地让20+文件一次处理,这意味着解压缩所有文件的运行时间比任何合理的超时时间都要长。

我可以使用XCom获得第1步的结果,但我想做的是如下所示:

代码语言:javascript
复制
def processFiles (reqDir, gvcfDir, matchSuffix):
    theFiles = getFiles (reqDir, gvcfDir, matchSuffix)

    for filePair in theFiles:
        task = PythonOperator (task_id = "Uncompress_" + os.path.basename (theFile), 
                                python_callable = expandFile, 
                                op_kwargs = {'theFile': theFile}, 
                                dag = dag)
task.set_upstream (runThis)

问题是"runThis“是调用processFiles的PythonOperator,因此必须在processFiles之后声明。

有什么办法让这件事成功吗?

这就是XCom存在的原因吗?我应该放弃这种方法,转而使用XCom吗?

EN

回答 1

Stack Overflow用户

发布于 2018-01-14 21:40:09

关于您建议的解决方案,我不认为您可以使用XComs来实现这一点,因为它们仅适用于实例,而不是当您定义DAG时(据我所知)。

但是,您可以使用SubDAG来实现您的目标。SubDagOperator将获得一个函数,该函数将在执行运算符时调用,并生成DAG,从而使您有机会动态创建工作流的一个子部分。

您可以使用这个简单的例子来测试这个想法,它每次调用任务时都会生成随机的任务:

代码语言:javascript
复制
import airflow
from builtins import range
from random import randint
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(dag_id='dynamic_dag', default_args=args)

def generate_subdag(parent_dag, dag_id, default_args):
    # pseudo-randomly determine a number of tasks to be created
    n_tasks = randint(1, 10)

    subdag = DAG(
        '%s.%s' % (parent_dag.dag_id, dag_id),
        schedule_interval=parent_dag.schedule_interval,
        start_date=parent_dag.start_date,
        default_args=default_args
    )
    for i in range(n_tasks):
        i = str(i)
        task = BashOperator(task_id='echo_%s' % i, bash_command='echo %s' % i, dag=subdag)

    return subdag

subdag_dag_id = 'dynamic_subdag'

SubDagOperator(
    subdag=generate_subdag(dag, subdag_dag_id, args),
    task_id=subdag_dag_id,
    dag=dag
)

如果您执行此操作,您将注意到在不同的运行中,SubDAG可能包含不同数量的任务(我用1.8.0版本进行了测试)。您可以访问图形视图,单击灰色SubDAG节点,然后单击“缩放到SubDAG”,从而访问WebUI上的SubDAG视图。

您可以通过列出文件并为每个文件创建一个任务来使用这个概念,而不只是像示例中那样以随机数生成它们。任务本身可以以并行方式(如我所做的)、顺序安排或任何有效的定向无循环布局来安排。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48197709

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档