我刚刚在Python3和Composer image版本composer-1.4.0-airflow-1.10.0上设置了一个Cloud Composer Environment。否则,所有设置都是“常用的”;即没有配置覆盖。
我正在尝试测试一个非常简单的DAG。它在我的本地Airflow服务器上运行没有问题,但在Cloud Composer上,web服务器的任务信息视图显示消息Dependencies Blocking Task From Getting Scheduled
依赖关系是Unknown,原因如下:
All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:
- The scheduler is down or under heavy load
- The following configuration values may be limiting the number of queueable processes: parallelism, dag_concurrency, max_active_dag_runs_per_dag, non_pooled_task_slot_count
If this task instance does not start soon please contact your Airflow administrator for assistance.无论任务是按计划运行,还是在web服务器中手动触发(为了避免延迟,我在执行此操作之前将所有任务实例设置为成功)都会发生这种情况。我已经尝试了resetting the scheduler in kubernetes as per this answer,但任务仍然停留在计划中。
另外,我注意到在我的本地实例上(在不同的Docker容器上运行服务器、工作者和调度器),Task Instances视图中的Hostname列是填充的,但在Cloud Composer上没有填充。
下面是我正在运行的DAG:
from datetime import datetime, timedelta
import random
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'queue': 'airflow',
'start_date': datetime.today() - timedelta(days=2),
'schedule_interval': None,
'retries': 2,
'retry_delay': timedelta(seconds=15),
'priority_weight': 10,
}
example_dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
def always_succeed():
pass
always_succeed_operator = PythonOperator(
dag=example_dag,
python_callable=always_succeed,
task_id='always_succeed'
)
def might_fail():
return 1 / random.randint(0, 1)
might_fail_operator = PythonOperator(
dag=example_dag, python_callable=might_fail, task_id='might_fail'
)
might_fail_operator.set_upstream(always_succeed_operator)发布于 2019-01-07 04:29:58
Cloud Composer不支持多个芹菜队列,请从默认参数中删除'queue' : 'airflow'。这应该会解决你的问题。
https://stackoverflow.com/questions/53860020
复制相似问题