我安装了气流,使用对接-合成和运行db init命令。我正在尝试创建一个使用DockerOperator执行某些脚本的DAG。在我的DockerOperator中,脚本试图读取气流变量并使用BaseHook获取连接。但是,我的DockerOperator中的脚本似乎连接到(空的) sqlalchemy数据库,而不是通过UI设置填充连接和变量的初始化postgres数据库。
是否有一种方法可以将DockerOperator上下文提供给包含通过UI设置的连接和变量的气流数据库?
我的气流DAG
from datetime import datetime
from json import dumps
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.models import XCom
from airflow.models.param import Param
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.db import provide_session
local_tz = pendulum.timezone("America/Los_Angeles") # will affect schedule interval time zone
args = {"owner": "Airflow"}
SYNC_SCRIPT_DAG = "sync_script_dag_v1"
@provide_session
def cleanup_xcom(session=None, **context):
print("Cleaning up!!!")
dag = context["dag"]
dag_id = dag._dag_id
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
#################### Define Tasks #####################
## Change Python Executor
with DAG(
dag_id=SYNC_SCRIPT_DAG,
default_args=args,
catchup=False,
start_date=datetime(2020, 7, 8, tzinfo=local_tz),
max_active_runs=1,
tags=["production"],
schedule_interval=None,
params={
# these env vars are expected by the script
"SOURCE_ONE": "source_two_credential", # credential ID in Airflow
"LOG_LEVEL": "INFO",
"NUM_WORKER_THREADS": "10",
},
on_failure_callback=cleanup_xcom,
on_success_callback=cleanup_xcom,
) as dag:
@task(task_id="get_airflow_params_task")
def get_airflow_params(**context):
airflow_params = context.get("params")
return dumps(airflow_params)
# mask_secret_task = mask_secret()
get_params_task = get_airflow_params()
new_task = DockerOperator(
task_id="my_task_id",
image="sync-script:latest",
api_version="auto",
auto_remove=True,
# SOURCE_ONE_JIRA_USER should evaluate to the connection dict for Airflow source_two_credential ID
command="""main.py
--SOURCE_ONE_JIRA_USER '{{ conn[params.SOURCE_TWO] }}'
--airflow_params '{{ ti.xcom_pull(task_ids='get_airflow_params_task')}}'
""",
environment={
"YML_CONFIG": "yml_config",
},
docker_url="unix://var/run/docker.sock",
network_mode="host",
force_pull=True,
docker_conn_id="harbor_credentials",
)
get_params_task >> new_task现在知道没有办法连接到气流变量或连接从DockerOperator,我删除了最初的错误,我得到。
我所希望的是一种通过气流params (一个connection_id )传入的方法,然后使用DockerOperator将该连接字典传递给我的DockerOperator脚本。这允许触发dag的不同用户传入他们自己的credential_id,而不必覆盖它。
发布于 2022-06-07 18:36:25
在DockerOperator中执行的代码是在一个“封闭环境”中,它不能访问气流资源。如果您需要它们,那么在初始化DockerOperator时必须传递它们。
command参数是模板字段,所以只需使用Jinja实现:
p1_auth_task = DockerOperator(
task_id="auth_v1",
image="tako:latest",
api_version="auto",
auto_remove=True,
command="auth.py --airflowmode {{ var.value.MY_INITIALIZED_VAR }}",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
mount_tmp_dir=False
)如果您愿意,也可以通过environment。
编辑:,因为您是在连接之后,然后使用use Airflow connection from a jinja template中解释的连接宏
就你而言:
epic_task = DockerOperator(
task_id="epic_task",
image="sync-script:latest",
api_version="auto",
auto_remove=True,
command="main.py",
environment={
"YML_CONFIG": "configs/epic_config.yaml",
"AIRFLOW_PARAMS_SOURCE_ONE_HOST": "{{ conn.SOURCE_ONE.host }}",
"AIRFLOW_PARAMS_SOURCE_TWO_HOST": "{{ conn.SOURCE_TWO.host }}",
},
docker_url="unix://var/run/docker.sock",
network_mode="host",
force_pull=True
)https://stackoverflow.com/questions/72482017
复制相似问题