首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用ExitHandler与Kubeflow管道SDK v2

如何使用ExitHandler与Kubeflow管道SDK v2
EN

Stack Overflow用户
提问于 2022-04-23 14:47:30
回答 2查看 941关注 0票数 1

我正在尝试将我所有的Kubeflow管道从使用之前的SDK v1 (kfp)移到更新的管线SDK v2 (kfp.v2)。我使用的是版本1.8.12.This重构,几乎所有的代码都证明是成功的,除了仍然存在的ExitHandlerfrom kfp.v2.dsl import ExitHandler。以前使用tar.gz文件将管道对象编译到-file中的方法保留了某种类型的Argo占位符,而使用compiler.Compiler().compile(pipeline_func=pipeline, package_path="basic-pipeline.json")的新.json管道的工作方式则不同。下面,我将详细介绍管道SDK v1中的工作原理以及我如何尝试在v2中实现它。

以前的,使用Kubeflow管道v1,我可以使用ExitHandler,如下图所示的在这个StackOverflow问题中。当其中一个管道组件失败时,向Slack发送一条消息。我会把管道定义为

代码语言:javascript
复制
import kfp.dsl as dsl

@dsl.pipeline(
    name='Basic-pipeline'
)
def pipeline(...):
    exit_task = dsl.ContainerOp(
        name='Exit handler that catches errors and post them in Slack',
        image='eu.gcr.io/.../send-error-msg-to-slack',
        arguments=[
                    'python3', 'main.py',
                    '--message', 'Basic-pipeline failed'
                    '--status', "{{workflow.status}}"
                  ]
    )
    with dsl.ExitHandler(exit_task):
        step_1 = dsl.ContainerOp(...)
        step_2 = dsl.ContainerOp(...) \
            .after(step_1)

if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz')

如果管道的任何步骤失败,exit_task将将message发送到我们的Slack。exit_task映像的代码如下所示

代码语言:javascript
复制
import argparse

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--message', type=str)
    parser.add_argument('--status', type=str)
    return parser.parse_known_args()

def main(FLAGS):
    def post_to_slack(msg):
        ...

    if FLAGS.status == "Failed":
        post_to_slack(FLAGS.message)
    else:
        pass

if __name__ == '__main__':
    FLAGS, unparsed = get_args()
    main(FLAGS)

这是可行的,因为底层的Argo工作流可以以某种方式理解"{{workflow.status}}"的概念。

但是,我现在正尝试使用顶点AI来运行管道,利用Kubeflow管道SDK v2,kfp.v2。现在,我使用与前面的'eu.gcr.io/.../send-error-msg-to-slack'相同的退出处理程序映像来定义一个yaml组件文件(exit_handler.yaml),

代码语言:javascript
复制
name: Exit handler
description: Prints to Slack if any step of the pipeline fails

inputs:
  - {name: message, type: String}
  - {name: status, type: String}

implementation:
  container:
    image: eu.gcr.io/.../send-error-msg-to-slack
    command: [
      python3,
      main.py,
      --message, {inputValue: message},
      --status, {inputValue: status}
    ]

管道代码现在看起来像这样,

代码语言:javascript
复制
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, ExitHandler
from kfp.components import load_component_from_file

@pipeline(name="Basic-pipeline",
          pipeline_root='gs://.../basic-pipeline')
def pipeline():
    exit_handler_spec = load_component_from_file('./exit_handler.yaml')
    exit_handler = exit_handler_spec(
        message="Basic pipeline failed.",
        status="{{workflow.status}}"
    )
    with ExitHandler(exit_handler):
        step_0_spec = load_component_from_file('./comp_0.yaml')
        step0 = step_0_spec(...)

        step_1_spec = load_component_from_file('./comp_1.yaml')
        step1 = step_1_spec(...) \
            .after(step0)

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=pipeline,
        package_path="basic-pipeline.json"
    )
    from google.oauth2 import service_account
    credentials = service_account.Credentials.from_service_account_file("./my-key.json")
    aiplatform.init(project='bsg-personalization',
                    location='europe-west4',
                    credentials=credentials)

    job = pipeline_jobs.PipelineJob(
        display_name="basic-pipeline",
        template_path="basic-pipeline.json",
        parameter_values={...}
    )
    job.run()

这个“工作”(没有例外)可以编译和运行,但是ExitHandler代码将status解释为一个值{workflow.status}的字符串,这也是由上面的代码(basic-pipeline.json)生成的编译管道json所指示的,您可以在下面("stringValue": "{{workflow.status}}")看到:

代码语言:javascript
复制
...
         "exit-handler": {
            "componentRef": {
              "name": "comp-exit-handler"
            },
            "dependentTasks": [
              "exit-handler-1"
            ],
            "inputs": {
              "parameters": {
                "message": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "Basic pipeline failed."
                    }
                  }
                },
                "status": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "{{workflow.status}}"
                    }
                  }
                }
              }
            },
            "taskInfo": {
              "name": "exit-handler"
            },
            "triggerPolicy": {
              "strategy": "ALL_UPSTREAM_TASKS_COMPLETED"
            }
          }
...

关于如何使用v1将旧的ExitHandler代码重构为新的SDK v2,以使退出处理程序了解如果管道的状态失败或没有的

EN

回答 2

Stack Overflow用户

发布于 2022-10-06 05:22:43

这可能还没有完全文档化,但在V2中,我们引入了一个不同的变量PipelineTaskFinalStatus,它可以自动填充,以便您将其发送到Slack通道。

下面是官方文档管道中退出处理程序的一个示例

下面是相应的电子邮件通知组件电子邮件/组件

您可以使用以下参数编写自己的组件,该参数将在退出处理程序运行时自动填充。

代码语言:javascript
复制
inputs:
...
  - name: pipeline_task_final_status
    type: PipelineTaskFinalStatus

(请注意,这个特性目前还不能在Kubeflow管道开放源码发行版中使用,并且将在KFP V2中使用。它只在顶点管道分布中可用)

票数 0
EN

Stack Overflow用户

发布于 2022-10-06 06:43:23

在KFP v2中替换v2是上面提到的IronPan中的特殊类型注释PipelineTaskFinalStatus

它的使用记录在https://www.kubeflow.org/docs/components/pipelines/v2/author-a-pipeline/pipelines/#dslexithandler中。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71980705

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档