首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ZenML Dag未显示在Airflow UI中

ZenML Dag未显示在Airflow UI中
EN

Stack Overflow用户
提问于 2021-11-14 02:09:37
回答 1查看 91关注 0票数 1

我正在试用ZenML,它说它可以将我的.py管道转换为气流DAG。我遵循了这里的每一步:https://docs.zenml.io/guides/low-level-api/chapter-7,都成功了

我的管道在本地运行良好,但是为什么看不到在airflow UI上创建的DAG?UI完全是空的.

问题似乎是,ZenML将复制我以ZenML方式编写的.py管道,并期望它可以在气流中运行……在我的情况下,这是行不通的。有人知道如何让ZenML通过airflow成功运行我的代码吗?

下面是我的ZenML .py代码:

代码语言:javascript
复制
import pandas as pd
import numpy as np
import os

import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score

from zenml.pipelines import pipeline
from zenml.steps import step
from zenml.steps.step_output import Output
from zenml.steps.base_step_config import BaseStepConfig

class pipeline_config(BaseStepConfig):
    """
    Params used in the pipeline
    """
    label: str = 'species'

@step
def split_data(config: pipeline_config) -> Output(
    X=pd.DataFrame, y=pd.DataFrame
):
    path_to_csv = os.path.join('~/airflow/data', 'leaf.csv')
    df = pd.read_csv(path_to_csv)
    label = config.label

    y = df[[label]]
    X = df.drop(label, axis=1)

    return X, y


@step
def train_evaltor(
    config: pipeline_config,
    X: pd.DataFrame,
    y: pd.DataFrame
) -> float:
    y = y[config.label]

    folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
    lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
    metrics_lst = []

    for train_idx, val_idx in folds.split(X, y):
        X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
        X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]

        lgbm.fit(X_train, y_train)
        y_pred = lgbm.predict(X_val)

        cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
        metrics_lst.append(cv_balanced_accuracy)

    avg_performance = np.mean(metrics_lst)
    print(f"Avg Performance: {avg_performance}")

    return avg_performance


@pipeline
def super_mini_pipeline(
    data_spliter,
    train_evaltor
):
    X, y = data_spliter()
    train_evaltor(X=X, y=y)


# run the pipeline
pipeline = super_mini_pipeline(data_spliter=split_data(),
                                train_evaltor=train_evaltor())
pipeline.run()
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-11-16 12:56:41

好了,它起作用了!如下图所示:

原因是,如果safe_mode处于启用状态(默认情况下处于启用状态),则必须在气流中出现单词airflowdag。这是airflow的特定逻辑,可以在airflow代码库中使用。

所以我所做的就是修改了最后几行:

代码语言:javascript
复制
# run the pipeline airflow
pipeline = super_mini_pipeline(data_spliter=split_data(),
                                train_evaltor=train_evaltor())
DAG = pipeline.run()

您还可以更改airflow.cfg文件并关闭安全模式:

$HOME/.config/zenml/airflow_root/<UUID>/airflow.cfg

代码语言:javascript
复制
# When discovering DAGs, ignore any files that don't contain the strings ``DAG`` and ``airflow``.
dag_discovery_safe_mode = False

编辑:可能还有另一个原因: Airflow DAG发现也依赖于全局DAG (),所以也许我们需要用DAG = pipeline.run()来捕捉它。所以在任何情况下,它都是有效的!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69959750

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档