首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >尽管有catchup=False、schedule_interval=datetime.timedelta(hours=2),Airflow仍在运行我的数据采集卡

尽管有catchup=False、schedule_interval=datetime.timedelta(hours=2),Airflow仍在运行我的数据采集卡
EN

Stack Overflow用户
提问于 2019-07-31 07:26:39
回答 1查看 330关注 0票数 0

与前面的问题类似,但给出的答案都不起作用。我有一个DAG:

代码语言:javascript
复制
import datetime
import os

from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.operators import BashOperator

PROJECT = os.environ['PROJECT']
GCS_BUCKET = os.environ['BUCKET']
API_KEY = os.environ['API_KEY']

default_args = {
    'owner': 'me',
    'start_date': datetime.datetime(2019, 7, 30),
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': datetime.timedelta(hours=1),
    'catchup': False
}

dag = DAG('dag-name',
          schedule_interval=datetime.timedelta(hours=2),
          default_args=default_args,
          max_active_runs=1,
          concurrency=1,
          catchup=False)

DEFAULT_OPTIONS_TEMPLATE = {
    'project': PROJECT,
    'stagingLocation': 'gs://{}/staging'.format(GCS_BUCKET),
    'tempLocation': 'gs://{}/temp'.format(GCS_BUCKET)
}

def my-dataflow-job(template_location, name):
    run_time = datetime.datetime.utcnow()
    a_value = run_time.strftime('%Y%m%d%H')

    t1 = DataflowTemplateOperator(
        task_id='{}-task'.format(name),
        template=template_location,
        parameters={'an_argument': a_value},
        dataflow_default_options=DEFAULT_OPTIONS_TEMPLATE,
        poll_sleep=30
    )

    t2 = BashOperator(
        task_id='{}-loader-heartbeat'.format(name),
        bash_command='curl --fail -XGET "[a heartbeat URL]" --header "Authorization: heartbeat_service {1}"'.format(name, API_KEY)
    )

    t1 >> t2

with dag:
    backup_bt_to_bq('gs://[path to gcs]'.format(GCS_BUCKET), 'name')

如你所见,我正在努力阻止气流回填。然而,当我部署DAG时(当天晚些时候,2019年7月30日),它只是一个接一个地运行DAG,一个接一个,一个接一个。

由于此任务需要移动一些数据,因此这是不可取的。如何让airflow遵守“每隔一小时运行一次”schedule_interval??

如您所见,我在DAG args和默认args中都设置了catchup: False (只是以防万一,在DAG args中从它们开始)。重试延迟也是一个很长的周期。

每次DAG运行都报告为成功。我使用以下版本运行:

composer-1.5.0-airflow-1.10.1

我的下一步是kubernetes cron...

EN

回答 1

Stack Overflow用户

发布于 2019-08-06 16:01:47

我怀疑您在第一次创建dag时还没有catchup=False。我认为airflow在初始dag创建后可能无法识别catchup参数的更改。

试着重命名它,看看会发生什么。例如,添加一个v2并启用它。启用后,即使catchup为false,它也会运行一次,因为存在有效的完成间隔(即当前时间是>= start_time + schedule_interval),但仅此而已。

当然,使用不做任何昂贵操作的假操作符进行测试。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57281279

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档