这是我第一次使用谷歌的顶点AI管道。我检查了这码标以及这个职位和这个职位,在一些从正式文件派生的链接之上。在一个玩具例子中,我决定让所有的知识都发挥作用:我计划构建一个由两个组件组成的管道:"get- data“(它读取存储在云存储中的一些.csv文件)和”report“(基本上返回在前面组件中读取的.csv数据的形状)。此外,我谨慎地将几点建议包括在这个论坛中。我目前的代码如下:
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform
# Components section
@component(
packages_to_install=[
"google-cloud-storage",
"pandas",
],
base_image="python:3.9",
output_component_file="get_data.yaml"
)
def get_data(
bucket: str,
url: str,
dataset: Output[Dataset],
):
import pandas as pd
from google.cloud import storage
storage_client = storage.Client("my-project")
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(url)
blob.download_to_filename('localdf.csv')
# path = "gs://my-bucket/program_grouping_data.zip"
df = pd.read_csv('localdf.csv', compression='zip')
df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')
@component(
packages_to_install=["pandas"],
base_image="python:3.9",
output_component_file="report_data.yaml"
)
def report_data(
inputd: Input[Dataset],
):
import pandas as pd
df = pd.read_csv(inputd.path)
return df.shape
# Pipeline section
@pipeline(
# Default pipeline root. You can override it when submitting the pipeline.
pipeline_root=PIPELINE_ROOT,
# A name for the pipeline.
name="my-pipeline",
)
def my_pipeline(
url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
bucket: str = "my-bucket"
):
dataset_task = get_data(bucket, url)
dimensions = report_data(
dataset_task.output
)
# Compilation section
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path="pipeline_job.json"
)
# Running and submitting job
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run1 = aiplatform.PipelineJob(
display_name="my-pipeline",
template_path="pipeline_job.json",
job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
enable_caching=True,
)
run1.submit()我很高兴看到编译的管道没有任何错误,并设法提交了作业。然而,“我的幸福持续时间很短”,比如当我去顶点AI管道时,我偶然发现了一些“错误”,比如:
由于某些任务失败,DAG失败。失败的任务是: get-data.;作业(project_id = my-project,job_id = 4290278978419163136)由于上述错误而失败。;未能处理作业:{project_number = xxxxxxxx,job_id = 4290278978419163136}
我没有在网上找到任何相关的信息,也没有找到任何日志或类似的东西,我觉得有点不知所措,这个(看似)简单的例子的解决方案,仍然在逃避我。
很明显,我不知道我在哪里搞错了什么。有什么建议吗?
发布于 2022-04-22 19:57:49
在评论中提供了一些建议,我认为我成功地使我的演示管道工作。我将首先包括更新的代码:
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from datetime import datetime
from google.cloud import aiplatform
from typing import NamedTuple
# Importing 'COMPONENTS' of the 'PIPELINE'
@component(
packages_to_install=[
"google-cloud-storage",
"pandas",
],
base_image="python:3.9",
output_component_file="get_data.yaml"
)
def get_data(
bucket: str,
url: str,
dataset: Output[Dataset],
):
"""Reads a csv file, from some location in Cloud Storage"""
import ast
import pandas as pd
from google.cloud import storage
# 'Pulling' demo .csv data from a know location in GCS
storage_client = storage.Client("my-project")
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(url)
blob.download_to_filename('localdf.csv')
# Reading the pulled demo .csv data
df = pd.read_csv('localdf.csv', compression='zip')
df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')
@component(
packages_to_install=["pandas"],
base_image="python:3.9",
output_component_file="report_data.yaml"
)
def report_data(
inputd: Input[Dataset],
) -> NamedTuple("output", [("rows", int), ("columns", int)]):
"""From a passed csv file existing in Cloud Storage, returns its dimensions"""
import pandas as pd
df = pd.read_csv(inputd.path+".csv")
return df.shape
# Building the 'PIPELINE'
@pipeline(
# i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
# Can be overriden when submitting the pipeline
pipeline_root=PIPELINE_ROOT,
name="readcsv-pipeline", # Your own naming for the pipeline.
)
def my_pipeline(
url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
bucket: str = "my-bucket"
):
dataset_task = get_data(bucket, url)
dimensions = report_data(
dataset_task.output
)
# Compiling the 'PIPELINE'
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path="pipeline_job.json"
)
# Running the 'PIPELINE'
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run1 = aiplatform.PipelineJob(
display_name="my-pipeline",
template_path="pipeline_job.json",
job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
parameter_values={
"url": "test_vertex/pipeline_root/program_grouping_data.zip",
"bucket": "my-bucket"
},
enable_caching=True,
)
# Submitting the 'PIPELINE'
run1.submit()现在,我要补充一些补充意见,总之,这些评论设法解决了我的问题:


Input[Dataset]或Ouput[Dataset]时,需要添加文件扩展名(而且很容易忘记)。例如,以get_data组件的输出为例,注意如何通过具体添加文件扩展名(即dataset.path + ".csv" )来记录数据。当然,这是一个非常小的例子,项目可以很容易地扩展到巨大的项目,然而,作为某种"Hello管道“,它将很好地工作。
谢谢。
发布于 2022-09-14 21:34:27
谢谢你的来信。很有帮助!我也犯了同样的错误,但结果是有不同的原因,所以在这里注意到.在我的管道定义步骤中,我有以下参数..。
“”“
def my_pipeline(bq_source_project: str = BQ_SOURCE_PROJECT,
bq_source_dataset: str = BQ_SOURCE_DATASET,
bq_source_table: str = BQ_SOURCE_TABLE,
output_data_path: str = "crime_data.csv"):“”“
我的错误是当我运行管道时,没有输入相同的参数。下面是固定版本..。“”“
job = pipeline_jobs.PipelineJob(
project=PROJECT_ID,
location=LOCATION,
display_name=PIPELINE_NAME,
job_id=JOB_ID,
template_path=FILENAME,
pipeline_root=PIPELINE_ROOT,
parameter_values={'bq_source_project': BQ_SOURCE_PROJECT,
'bq_source_dataset': BQ_SOURCE_DATASET,
'bq_source_table': BQ_SOURCE_TABLE} “”“
https://stackoverflow.com/questions/71962260
复制相似问题