catchup=False,#避免一次补几千天的“悲剧”default_args=default_args)asdag:start=EmptyOperator(task_id="start")extract=KubernetesPodOperator latest",cmds=["python","extract.py"],arguments=["--date","{{ds}}"],is_delete_operator_pod=True)transform=KubernetesPodOperator latest",cmds=["python","transform.py"],arguments=["--date","{{ds}}"],is_delete_operator_pod=True)load=KubernetesPodOperator is_delete_operator_pod=True)end=EmptyOperator(task_id="end")start>>extract>>transform>>load>>end你会发现几个核心点:用KubernetesPodOperator
对于需要更多资源的自定义作业,我们可以选择使用 KubernetesPodOperator 运行它们。
KubernetesPodOperator:在Kubernetes中执行Pod。