我有一个DAG,我传递了各种配置,我想传递的设置之一是它应该多久运行一次。
例如,使用相同的DAG,我有两个不同的运行。跑A我想每天跑。我想每周跑一次。这两种方法都使用完全相同的DAG代码,但传递的配置不同。
据我所见,没有办法在配置中轻松地通过调度。我唯一的解决方案是用完全相同的代码制作多个DAG,但时间表不同,这会导致大量冗余的代码重复。
还有更好的选择吗?
举个例子,我有一个dag,它是一个网络爬虫,我把urls传递给它来爬行。基本上,我需要修改不同urls集的爬行频率。我传递的urls可能会更改,除了运行参数之外,没有其他方法可以确定使用什么计划。
发布于 2022-05-29 17:08:06
在这种情况下,因为“每日”包含每周一次,所以最好每天运行一次,并使用分支操作符根据一周中的每一天来决定使用什么逻辑。
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
with DAG(
dag_id="my_dag",
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
) as dag:
task_a = EmptyOperator(task_id='logic_a') # Replace with your actual operators for 1st configuration/logic
task_b = EmptyOperator(task_id='logic_b') # Replace with your actual operators for 2nd configuration/logic
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="logic_a",
follow_task_ids_if_false="logic_b",
week_day="Monday",
)
branch >> [task_a, task_b]在本例中,DAG每天都在运行。周一,它将跟随task_a,本周剩下的时间,它将跟随task_b。
https://stackoverflow.com/questions/72425600
复制相似问题