我正在尝试将我所有的Kubeflow管道从使用之前的SDK v1 (kfp)移到更新的管线SDK v2 (kfp.v2)。我使用的是版本1.8.12.This重构,几乎所有的代码都证明是成功的,除了仍然存在的ExitHandler;from kfp.v2.dsl import ExitHandler。以前使用tar.gz文件将管道对象编译到-file中的方法保留了某种类型的Argo占位符,而使用compiler.Compiler().compile(pipeline_func=pipeline, package_path="basic-pipeline.json")的新.json管道的工作方式则不同。下面,我将详细介绍管道SDK v1中的工作原理以及我如何尝试在v2中实现它。
以前的,使用Kubeflow管道v1,我可以使用ExitHandler,如下图所示的在这个StackOverflow问题中。当其中一个管道组件失败时,向Slack发送一条消息。我会把管道定义为
import kfp.dsl as dsl
@dsl.pipeline(
name='Basic-pipeline'
)
def pipeline(...):
exit_task = dsl.ContainerOp(
name='Exit handler that catches errors and post them in Slack',
image='eu.gcr.io/.../send-error-msg-to-slack',
arguments=[
'python3', 'main.py',
'--message', 'Basic-pipeline failed'
'--status', "{{workflow.status}}"
]
)
with dsl.ExitHandler(exit_task):
step_1 = dsl.ContainerOp(...)
step_2 = dsl.ContainerOp(...) \
.after(step_1)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz')如果管道的任何步骤失败,exit_task将将message发送到我们的Slack。exit_task映像的代码如下所示
import argparse
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--message', type=str)
parser.add_argument('--status', type=str)
return parser.parse_known_args()
def main(FLAGS):
def post_to_slack(msg):
...
if FLAGS.status == "Failed":
post_to_slack(FLAGS.message)
else:
pass
if __name__ == '__main__':
FLAGS, unparsed = get_args()
main(FLAGS)这是可行的,因为底层的Argo工作流可以以某种方式理解"{{workflow.status}}"的概念。
但是,我现在正尝试使用顶点AI来运行管道,利用Kubeflow管道SDK v2,kfp.v2。现在,我使用与前面的'eu.gcr.io/.../send-error-msg-to-slack'相同的退出处理程序映像来定义一个yaml组件文件(exit_handler.yaml),
name: Exit handler
description: Prints to Slack if any step of the pipeline fails
inputs:
- {name: message, type: String}
- {name: status, type: String}
implementation:
container:
image: eu.gcr.io/.../send-error-msg-to-slack
command: [
python3,
main.py,
--message, {inputValue: message},
--status, {inputValue: status}
]管道代码现在看起来像这样,
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, ExitHandler
from kfp.components import load_component_from_file
@pipeline(name="Basic-pipeline",
pipeline_root='gs://.../basic-pipeline')
def pipeline():
exit_handler_spec = load_component_from_file('./exit_handler.yaml')
exit_handler = exit_handler_spec(
message="Basic pipeline failed.",
status="{{workflow.status}}"
)
with ExitHandler(exit_handler):
step_0_spec = load_component_from_file('./comp_0.yaml')
step0 = step_0_spec(...)
step_1_spec = load_component_from_file('./comp_1.yaml')
step1 = step_1_spec(...) \
.after(step0)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="basic-pipeline.json"
)
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("./my-key.json")
aiplatform.init(project='bsg-personalization',
location='europe-west4',
credentials=credentials)
job = pipeline_jobs.PipelineJob(
display_name="basic-pipeline",
template_path="basic-pipeline.json",
parameter_values={...}
)
job.run()这个“工作”(没有例外)可以编译和运行,但是ExitHandler代码将status解释为一个值{workflow.status}的字符串,这也是由上面的代码(basic-pipeline.json)生成的编译管道json所指示的,您可以在下面("stringValue": "{{workflow.status}}")看到:
...
"exit-handler": {
"componentRef": {
"name": "comp-exit-handler"
},
"dependentTasks": [
"exit-handler-1"
],
"inputs": {
"parameters": {
"message": {
"runtimeValue": {
"constantValue": {
"stringValue": "Basic pipeline failed."
}
}
},
"status": {
"runtimeValue": {
"constantValue": {
"stringValue": "{{workflow.status}}"
}
}
}
}
},
"taskInfo": {
"name": "exit-handler"
},
"triggerPolicy": {
"strategy": "ALL_UPSTREAM_TASKS_COMPLETED"
}
}
...关于如何使用v1将旧的ExitHandler代码重构为新的SDK v2,以使退出处理程序了解如果管道的状态失败或没有的。
发布于 2022-10-06 05:22:43
发布于 2022-10-06 06:43:23
在KFP v2中替换v2是上面提到的IronPan中的特殊类型注释PipelineTaskFinalStatus。
它的使用记录在https://www.kubeflow.org/docs/components/pipelines/v2/author-a-pipeline/pipelines/#dslexithandler中。
https://stackoverflow.com/questions/71980705
复制相似问题