我对气流的认识大多是全新的。
我有两个步骤:
文件被压缩了一半,解压缩时是2-3千兆。我可以很容易地让20+文件一次处理,这意味着解压缩所有文件的运行时间比任何合理的超时时间都要长。
我可以使用XCom获得第1步的结果,但我想做的是如下所示:
def processFiles (reqDir, gvcfDir, matchSuffix):
theFiles = getFiles (reqDir, gvcfDir, matchSuffix)
for filePair in theFiles:
task = PythonOperator (task_id = "Uncompress_" + os.path.basename (theFile),
python_callable = expandFile,
op_kwargs = {'theFile': theFile},
dag = dag)
task.set_upstream (runThis)问题是"runThis“是调用processFiles的PythonOperator,因此必须在processFiles之后声明。
有什么办法让这件事成功吗?
这就是XCom存在的原因吗?我应该放弃这种方法,转而使用XCom吗?
发布于 2018-01-14 21:40:09
关于您建议的解决方案,我不认为您可以使用XComs来实现这一点,因为它们仅适用于实例,而不是当您定义DAG时(据我所知)。
但是,您可以使用SubDAG来实现您的目标。SubDagOperator将获得一个函数,该函数将在执行运算符时调用,并生成DAG,从而使您有机会动态创建工作流的一个子部分。
您可以使用这个简单的例子来测试这个想法,它每次调用任务时都会生成随机的任务:
import airflow
from builtins import range
from random import randint
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DAG
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(dag_id='dynamic_dag', default_args=args)
def generate_subdag(parent_dag, dag_id, default_args):
# pseudo-randomly determine a number of tasks to be created
n_tasks = randint(1, 10)
subdag = DAG(
'%s.%s' % (parent_dag.dag_id, dag_id),
schedule_interval=parent_dag.schedule_interval,
start_date=parent_dag.start_date,
default_args=default_args
)
for i in range(n_tasks):
i = str(i)
task = BashOperator(task_id='echo_%s' % i, bash_command='echo %s' % i, dag=subdag)
return subdag
subdag_dag_id = 'dynamic_subdag'
SubDagOperator(
subdag=generate_subdag(dag, subdag_dag_id, args),
task_id=subdag_dag_id,
dag=dag
)如果您执行此操作,您将注意到在不同的运行中,SubDAG可能包含不同数量的任务(我用1.8.0版本进行了测试)。您可以访问图形视图,单击灰色SubDAG节点,然后单击“缩放到SubDAG”,从而访问WebUI上的SubDAG视图。
您可以通过列出文件并为每个文件创建一个任务来使用这个概念,而不只是像示例中那样以随机数生成它们。任务本身可以以并行方式(如我所做的)、顺序安排或任何有效的定向无循环布局来安排。
https://stackoverflow.com/questions/48197709
复制相似问题