与前面的问题类似,但给出的答案都不起作用。我有一个DAG:
import datetime
import os
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.operators import BashOperator
PROJECT = os.environ['PROJECT']
GCS_BUCKET = os.environ['BUCKET']
API_KEY = os.environ['API_KEY']
default_args = {
'owner': 'me',
'start_date': datetime.datetime(2019, 7, 30),
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': datetime.timedelta(hours=1),
'catchup': False
}
dag = DAG('dag-name',
schedule_interval=datetime.timedelta(hours=2),
default_args=default_args,
max_active_runs=1,
concurrency=1,
catchup=False)
DEFAULT_OPTIONS_TEMPLATE = {
'project': PROJECT,
'stagingLocation': 'gs://{}/staging'.format(GCS_BUCKET),
'tempLocation': 'gs://{}/temp'.format(GCS_BUCKET)
}
def my-dataflow-job(template_location, name):
run_time = datetime.datetime.utcnow()
a_value = run_time.strftime('%Y%m%d%H')
t1 = DataflowTemplateOperator(
task_id='{}-task'.format(name),
template=template_location,
parameters={'an_argument': a_value},
dataflow_default_options=DEFAULT_OPTIONS_TEMPLATE,
poll_sleep=30
)
t2 = BashOperator(
task_id='{}-loader-heartbeat'.format(name),
bash_command='curl --fail -XGET "[a heartbeat URL]" --header "Authorization: heartbeat_service {1}"'.format(name, API_KEY)
)
t1 >> t2
with dag:
backup_bt_to_bq('gs://[path to gcs]'.format(GCS_BUCKET), 'name')如你所见,我正在努力阻止气流回填。然而,当我部署DAG时(当天晚些时候,2019年7月30日),它只是一个接一个地运行DAG,一个接一个,一个接一个。
由于此任务需要移动一些数据,因此这是不可取的。如何让airflow遵守“每隔一小时运行一次”schedule_interval??
如您所见,我在DAG args和默认args中都设置了catchup: False (只是以防万一,在DAG args中从它们开始)。重试延迟也是一个很长的周期。
每次DAG运行都报告为成功。我使用以下版本运行:
composer-1.5.0-airflow-1.10.1
我的下一步是kubernetes cron...
发布于 2019-08-06 16:01:47
我怀疑您在第一次创建dag时还没有catchup=False。我认为airflow在初始dag创建后可能无法识别catchup参数的更改。
试着重命名它,看看会发生什么。例如,添加一个v2并启用它。启用后,即使catchup为false,它也会运行一次,因为存在有效的完成间隔(即当前时间是>= start_time + schedule_interval),但仅此而已。
当然,使用不做任何昂贵操作的假操作符进行测试。
https://stackoverflow.com/questions/57281279
复制相似问题