首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >DockerOperator存取气流变量

DockerOperator存取气流变量
EN

Stack Overflow用户
提问于 2022-06-02 20:30:26
回答 1查看 339关注 0票数 1

我安装了气流,使用对接-合成和运行db init命令。我正在尝试创建一个使用DockerOperator执行某些脚本的DAG。在我的DockerOperator中,脚本试图读取气流变量并使用BaseHook获取连接。但是,我的DockerOperator中的脚本似乎连接到(空的) sqlalchemy数据库,而不是通过UI设置填充连接和变量的初始化postgres数据库。

是否有一种方法可以将DockerOperator上下文提供给包含通过UI设置的连接和变量的气流数据库?

我的气流DAG

代码语言:javascript
复制
from datetime import datetime
from json import dumps

import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.models import XCom
from airflow.models.param import Param
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.db import provide_session


local_tz = pendulum.timezone("America/Los_Angeles")  # will affect schedule interval time zone

args = {"owner": "Airflow"}

SYNC_SCRIPT_DAG = "sync_script_dag_v1"


@provide_session
def cleanup_xcom(session=None, **context):
    print("Cleaning up!!!")
    dag = context["dag"]
    dag_id = dag._dag_id
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()


#################### Define Tasks #####################
## Change Python Executor
with DAG(
    dag_id=SYNC_SCRIPT_DAG,
    default_args=args,
    catchup=False,
    start_date=datetime(2020, 7, 8, tzinfo=local_tz),
    max_active_runs=1,
    tags=["production"],
    schedule_interval=None,
    params={
        # these env vars are expected by the script
        "SOURCE_ONE": "source_two_credential",  # credential ID in Airflow 
        "LOG_LEVEL": "INFO",
        "NUM_WORKER_THREADS": "10",
    },
    on_failure_callback=cleanup_xcom,
    on_success_callback=cleanup_xcom,
) as dag:

    @task(task_id="get_airflow_params_task")
    def get_airflow_params(**context):
        airflow_params = context.get("params")
        return dumps(airflow_params)

    # mask_secret_task = mask_secret()
    get_params_task = get_airflow_params()

    new_task = DockerOperator(
        task_id="my_task_id",
        image="sync-script:latest",
        api_version="auto",
        auto_remove=True,

        # SOURCE_ONE_JIRA_USER should evaluate to the connection dict for Airflow source_two_credential ID
        command="""main.py
            --SOURCE_ONE_JIRA_USER '{{ conn[params.SOURCE_TWO] }}'
            --airflow_params '{{ ti.xcom_pull(task_ids='get_airflow_params_task')}}'
        """,
        environment={
            "YML_CONFIG": "yml_config",
        },
        docker_url="unix://var/run/docker.sock",
        network_mode="host",
        force_pull=True,
        docker_conn_id="harbor_credentials",
    )

    get_params_task >> new_task

现在知道没有办法连接到气流变量或连接从DockerOperator,我删除了最初的错误,我得到。

我所希望的是一种通过气流params (一个connection_id )传入的方法,然后使用DockerOperator将该连接字典传递给我的DockerOperator脚本。这允许触发dag的不同用户传入他们自己的credential_id,而不必覆盖它。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-07 18:36:25

DockerOperator中执行的代码是在一个“封闭环境”中,它不能访问气流资源。如果您需要它们,那么在初始化DockerOperator时必须传递它们。

command参数是模板字段,所以只需使用Jinja实现:

代码语言:javascript
复制
p1_auth_task = DockerOperator(
    task_id="auth_v1",
    image="tako:latest",
    api_version="auto",
    auto_remove=True,
    command="auth.py --airflowmode {{ var.value.MY_INITIALIZED_VAR }}",
    docker_url="unix://var/run/docker.sock",
    network_mode="bridge",
    mount_tmp_dir=False
)

如果您愿意,也可以通过environment

编辑:,因为您是在连接之后,然后使用use Airflow connection from a jinja template中解释的连接宏

就你而言:

代码语言:javascript
复制
epic_task = DockerOperator(
    task_id="epic_task",
    image="sync-script:latest",
    api_version="auto",
    auto_remove=True,
    command="main.py",
    environment={
        "YML_CONFIG": "configs/epic_config.yaml",
        "AIRFLOW_PARAMS_SOURCE_ONE_HOST": "{{ conn.SOURCE_ONE.host }}",
        "AIRFLOW_PARAMS_SOURCE_TWO_HOST": "{{ conn.SOURCE_TWO.host }}",
    },
    docker_url="unix://var/run/docker.sock",
    network_mode="host",
    force_pull=True
)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72482017

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档