首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Airflow:如何将xcom从父dag传递到子dag

Airflow:如何将xcom从父dag传递到子dag
EN

Stack Overflow用户
提问于 2019-02-20 09:46:19
回答 2查看 3.5K关注 0票数 5

考虑到一个将值推送到xcom的父dag,如何从子dag检索dag?

我尝试过的:

代码语言:javascript
复制
#parent_dag.py

PARENT_DAG_NAME = "MyParentDag"
CHILD_DAG_NAME = "MyChildDag"

main_dag = DAG(
  dag_id=PARENT_DAG_NAME,
  schedule_interval="@hourly",
  start_date=DAG_START_DATE
)


def push_value(**kwargs):
    ''' push into Xcom '''
    return [1, 2]

t1 = PythonOperator(task_id='push_value',
                       python_callable=push_value,
                       retries=3,
                       dag=main_dag)

subdag_1 = SubDagOperator(
  subdag=Sub_Dag1(
      PARENT_DAG_NAME,
      CHILD_DAG_NAME,
      main_dag.start_date,
      main_dag.schedule_interval,
      "'{{ ti.xcom_pull(task_ids='push_value', dag_id='" + PARENT_DAG_NAME + "' }}'"
  ),
  task_id=CHILD_DAG_NAME,
  dag=main_dag,
)
t1 >> subdag_1

和子subdag:

代码语言:javascript
复制
#subdag1.py


def use_pushed_val(pushed_val, ds, **kwargs):
    log.info(pushed_val)
    return pushed_val

def Sub_Dag1(parent_dag_name, child_dag_name, start_date, schedule_interval, pushed_val):
  dag = DAG(
    '%s.%s' % (parent_dag_name, child_dag_name),
    schedule_interval=schedule_interval,
    start_date=start_date,
  )

  childTask = PythonOperator(
      task_id='child_task',
      python_callable=use_pushed_val,
      op_kwargs = {'pushed_val' : pushed_val},
      provide_context=True,
      dag=dag
    )

return dag

它返回字符串'{{ ti.xcom_pull(task_ids='push_value', dag_id='MyParentDag' }}',而不是子子日志并返回[1,2]

EN

回答 2

Stack Overflow用户

发布于 2019-02-20 10:32:30

我看到您已经设置了provide_context=True,所以这很好。这就是我如何使用**上下文参数在父dags /子dags之间传递变量。

代码语言:javascript
复制
def push_value(**context):
    context['ti'].xcom_push(key='my_key', value='my_value')

def use_pushed_val(**context):
    value_from_parent = context['ti'].xcom_pull(task_ids=t1.task_id, key='my_key')
票数 0
EN

Stack Overflow用户

发布于 2021-07-14 21:46:47

代码语言:javascript
复制
{{ti.xcom_pull(task_ids='push_value', dag_id='" + PARENT_DAG_NAME + "')}}

在上面的代码中漏掉了一个括号

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54777603

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档