我正在试用ZenML,它说它可以将我的.py管道转换为气流DAG。我遵循了这里的每一步:https://docs.zenml.io/guides/low-level-api/chapter-7,都成功了
我的管道在本地运行良好,但是为什么看不到在airflow UI上创建的DAG?UI完全是空的.
问题似乎是,ZenML将复制我以ZenML方式编写的.py管道,并期望它可以在气流中运行……在我的情况下,这是行不通的。有人知道如何让ZenML通过airflow成功运行我的代码吗?
下面是我的ZenML .py代码:
import pandas as pd
import numpy as np
import os
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score
from zenml.pipelines import pipeline
from zenml.steps import step
from zenml.steps.step_output import Output
from zenml.steps.base_step_config import BaseStepConfig
class pipeline_config(BaseStepConfig):
"""
Params used in the pipeline
"""
label: str = 'species'
@step
def split_data(config: pipeline_config) -> Output(
X=pd.DataFrame, y=pd.DataFrame
):
path_to_csv = os.path.join('~/airflow/data', 'leaf.csv')
df = pd.read_csv(path_to_csv)
label = config.label
y = df[[label]]
X = df.drop(label, axis=1)
return X, y
@step
def train_evaltor(
config: pipeline_config,
X: pd.DataFrame,
y: pd.DataFrame
) -> float:
y = y[config.label]
folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
metrics_lst = []
for train_idx, val_idx in folds.split(X, y):
X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]
lgbm.fit(X_train, y_train)
y_pred = lgbm.predict(X_val)
cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
metrics_lst.append(cv_balanced_accuracy)
avg_performance = np.mean(metrics_lst)
print(f"Avg Performance: {avg_performance}")
return avg_performance
@pipeline
def super_mini_pipeline(
data_spliter,
train_evaltor
):
X, y = data_spliter()
train_evaltor(X=X, y=y)
# run the pipeline
pipeline = super_mini_pipeline(data_spliter=split_data(),
train_evaltor=train_evaltor())
pipeline.run()发布于 2021-11-16 12:56:41
好了,它起作用了!如下图所示:

原因是,如果safe_mode处于启用状态(默认情况下处于启用状态),则必须在气流中出现单词airflow和dag。这是airflow的特定逻辑,可以在airflow代码库中使用。
所以我所做的就是修改了最后几行:
# run the pipeline airflow
pipeline = super_mini_pipeline(data_spliter=split_data(),
train_evaltor=train_evaltor())
DAG = pipeline.run()您还可以更改airflow.cfg文件并关闭安全模式:
在$HOME/.config/zenml/airflow_root/<UUID>/airflow.cfg中
# When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``.
dag_discovery_safe_mode = False编辑:可能还有另一个原因: Airflow DAG发现也依赖于全局DAG (),所以也许我们需要用DAG = pipeline.run()来捕捉它。所以在任何情况下,它都是有效的!
https://stackoverflow.com/questions/69959750
复制相似问题