我试图控制一个任务在哪个气流工作者上执行,但是DAG定义中的队列参数没有被调度程序捕获。
我在subdag操作符中定义了一个队列:
xdata_run_etl = sub_dag_operator_with_celery_executor(
subdag = build_xdata_etl_dag(dag, 'xdata_run_etl'),
task_id = 'xdata_run_etl',
dag = dag,
trigger_rule='none_failed',
queue='subdag'
)我可以看到队列设置已经被选中了。在UI中的“任务属性”部分中,queue设置为subdag。
但是,当我触发DAG时,调度程序仍然将任务发送到默认队列。正如调度程序日志所观察到的那样:
: [2020-04-02 20:38:49,581] {scheduler_job.py:1168} INFO - Sending ('run_etl', 'xdata_run_etl', datetime.datetime(2020, 4, 2, 17, 27, 38, 368220, tzininfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 10) to executor with priority 2 and queue default预期的行为是将此任务发送到subdag队列,并在正在侦听此队列的气流工作人员上运行。(airflow worker -q subdag)。实际行为是,所有任务都被发送到默认队列,而不管队列参数的定义如何。
气流版本: 1.10.9
发布于 2022-05-25 08:47:27
这可能是由于手动执行了一个dag,但是您也可以尝试将airflow.cfg中的airflow.cfg参数设置为您的自定义队列名,它将工作。
https://stackoverflow.com/questions/61007571
复制相似问题