问题:
在气流中有什么最佳的方法来保持简单吗?我在想(没有具体的顺序):
我找不到关于这个特殊用例的很多材料。
理想情况下,我们有一个“模板”,这是每个客户重复使用。目前还不清楚一个工作还是多个工作是最好的解决方案。或者还有另一种更适合这种用法的方法?
发布于 2020-01-02 13:13:49
气流对Google平台有着广泛的支持。但是请注意,大多数钩子和操作符都在cont肋骨部分,这意味着它们具有beta状态,这意味着它们可以在小版本之间进行中断更改。
客户端的数量:
可以有尽可能多的DAG需要,其中每一个都可以提到多个任务。建议将一个逻辑工作流保存在一个DAG文件中,并尽量保持它很轻(例如,配置文件)。它允许花费更少的时间和资源,使气流调度器在每一次心跳时处理它们。
可以基于任意数量的配置参数动态创建DAG(具有相同的基本代码),这在有大量客户端时非常有用,而且节省时间。
若要创建新的DAG,请在create_dag函数中创建DAG模板。可以将代码包装在允许传递自定义参数的方法中。此外,输入参数不必存在于dag文件本身中。生成DAG的另一种常见形式是在Variable对象中设置值。请参阅这里以获取更多信息。
特定于的客户信任:
可以使用宏在运行时将动态信息传递到任务实例。可在所有模板中访问的默认变量列表可以找到这里。
气流对金佳模板的内置支持使用户能够传递参数,这些参数可以在模板化的领域中使用。
UI概述
如果您的守护进程需要很长时间来加载,则可以将airflow.cfg中的airflow.cfg配置值减少到一个较小的值。这种可配置的操作控制运行在默认值为25的UI中显示的守护进程的数量。
模数
如果将default_args的字典传递给DAG,它将将它们应用于其任何运算符。这样就可以很容易地将公共参数应用于许多操作符,而不必多次键入它。
看一看,例如:
from datetime import datetime, timedelta
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow有关BaseOperator参数及其所做工作的更多信息,请参阅airflow.models.BaseOperator文档。
性能
可以使用变量,您可以控制这些变量以提高气流DAG性能(可以在airflow.cfg中设置):
parallelism:控制在整个气流集群中同时运行的任务实例的数量。concurrency:在任何给定的时间,气流调度程序只运行于DAG的并发任务实例。并发性在您的气流DAG中定义。如果不在DAG上设置并发性,调度程序将使用来自dag_concurrency条目的airflow.cfg默认值。task_concurrency:这个变量控制每个任务跨dag_runs的并发运行任务实例的数量。max_active_runs:在给定的时间内,气流调度程序只运行于DAG的max_active_runs DagRuns。pool:这个变量控制分配给池的并发运行任务实例的数量。您可以在composer实例桶gs://composer_instance_bucket/airflow.cfg中看到气流配置。您可以根据自己的意愿对此配置进行调优,但请记住,composer的某些配置已被阻塞。
标度
请记住,建议节点数必须大于3,如果要升级可以使用gcloud命令指定此值的节点数,则将此数目保持在3以下可能会引起一些问题。此外,请注意,有一些气流配置与自动呼叫阻塞,不能是重写。有些气流配置是为Composer预先配置的,您不能更改它们。
Fault-tolerance
请参阅以下文档。
Re-execution
就像对象是类的实例一样,气流任务也是运算符 (BaseOperator)的实例。因此,编写一个“可重用”操作符,并通过不同的等值线在管道上使用数百次。
潜伏期
使用以下方法可以减少生产中的气流DAG调度延迟:
max_threads:调度程序将生成与调度dags并行的多个线程。这是由默认值为2的max_threads控制的。scheduler_heartbeat_sec:用户应该考虑将scheduler_heartbeat_sec配置提高到更高的值(例如60秒),该值控制气流调度程序获取心跳的频率并更新数据库中作业的条目。请参阅以下关于最佳做法的文章:
我希望它能在某种程度上帮助你。
https://stackoverflow.com/questions/59510874
复制相似问题