首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >顶点AI管道中的数据读取

顶点AI管道中的数据读取
EN

Stack Overflow用户
提问于 2022-04-22 00:40:56
回答 2查看 1.3K关注 0票数 4

这是我第一次使用谷歌的顶点AI管道。我检查了这码标以及这个职位这个职位,在一些从正式文件派生的链接之上。在一个玩具例子中,我决定让所有的知识都发挥作用:我计划构建一个由两个组件组成的管道:"get- data“(它读取存储在云存储中的一些.csv文件)和”report“(基本上返回在前面组件中读取的.csv数据的形状)。此外,我谨慎地将几点建议包括在这个论坛中。我目前的代码如下:

代码语言:javascript
复制
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform

# Components section   

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    import pandas as pd
    from google.cloud import storage
    
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # path = "gs://my-bucket/program_grouping_data.zip"
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
):
    import pandas as pd
    df = pd.read_csv(inputd.path)
    return df.shape


# Pipeline section

@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="my-pipeline",
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )

# Compilation section

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)

# Running and submitting job

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
    enable_caching=True,
)

run1.submit()

我很高兴看到编译的管道没有任何错误,并设法提交了作业。然而,“我的幸福持续时间很短”,比如当我去顶点AI管道时,我偶然发现了一些“错误”,比如:

由于某些任务失败,DAG失败。失败的任务是: get-data.;作业(project_id = my-project,job_id = 4290278978419163136)由于上述错误而失败。;未能处理作业:{project_number = xxxxxxxx,job_id = 4290278978419163136}

我没有在网上找到任何相关的信息,也没有找到任何日志或类似的东西,我觉得有点不知所措,这个(看似)简单的例子的解决方案,仍然在逃避我。

很明显,我不知道我在哪里搞错了什么。有什么建议吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-04-22 19:57:49

在评论中提供了一些建议,我认为我成功地使我的演示管道工作。我将首先包括更新的代码:

代码语言:javascript
复制
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from datetime import datetime
from google.cloud import aiplatform
from typing import NamedTuple


# Importing 'COMPONENTS' of the 'PIPELINE'

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    """Reads a csv file, from some location in Cloud Storage"""
    import ast
    import pandas as pd
    from google.cloud import storage
    
    # 'Pulling' demo .csv data from a know location in GCS
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # Reading the pulled demo .csv data
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
) -> NamedTuple("output", [("rows", int), ("columns", int)]):
    """From a passed csv file existing in Cloud Storage, returns its dimensions"""
    import pandas as pd
    
    df = pd.read_csv(inputd.path+".csv")
    
    return df.shape


# Building the 'PIPELINE'

@pipeline(
    # i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
    # Can be overriden when submitting the pipeline
    pipeline_root=PIPELINE_ROOT,
    name="readcsv-pipeline",  # Your own naming for the pipeline.
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )
    

# Compiling the 'PIPELINE'    

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)


# Running the 'PIPELINE'

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={
        "url": "test_vertex/pipeline_root/program_grouping_data.zip",
        "bucket": "my-bucket"
    },
    enable_caching=True,
)

# Submitting the 'PIPELINE'

run1.submit()

现在,我要补充一些补充意见,总之,这些评论设法解决了我的问题:

  • 首先,为用户启用“logging.viewer”(角色/日志查看器),将极大地帮助解决管道中的任何现有错误(注意:该角色对我有效,但是您可能希望为自己的目的寻找更好的匹配角色这里)。这些错误将显示为“日志”,可以通过单击相应的按钮来访问:

  • 注意:在上面的图片中,当显示“日志”时,仔细检查每个日志(接近您创建管道的时间)可能会有帮助,因为通常每个日志对应于一个警告或错误行:

  • 其次,我的管道输出是一个元组。在我最初的方法中,我只是返回普通元组,但是建议它返回一个NamedTuple。通常,如果您需要输入/输出一个或多个“小值”(int或str,由于任何原因),请选择一个NamedTuple。
  • 第三,当管道之间的连接是Input[Dataset]Ouput[Dataset]时,需要添加文件扩展名(而且很容易忘记)。例如,以get_data组件的输出为例,注意如何通过具体添加文件扩展名(即dataset.path + ".csv" )来记录数据。

当然,这是一个非常小的例子,项目可以很容易地扩展到巨大的项目,然而,作为某种"Hello管道“,它将很好地工作。

谢谢。

票数 4
EN

Stack Overflow用户

发布于 2022-09-14 21:34:27

谢谢你的来信。很有帮助!我也犯了同样的错误,但结果是有不同的原因,所以在这里注意到.在我的管道定义步骤中,我有以下参数..。

“”“

代码语言:javascript
复制
def my_pipeline(bq_source_project: str = BQ_SOURCE_PROJECT,  
                    bq_source_dataset: str = BQ_SOURCE_DATASET,  
                    bq_source_table: str = BQ_SOURCE_TABLE,  
                    output_data_path: str = "crime_data.csv"):

“”“

我的错误是当我运行管道时,没有输入相同的参数。下面是固定版本..。“”“

代码语言:javascript
复制
job = pipeline_jobs.PipelineJob(  
project=PROJECT_ID,  
      location=LOCATION,  
      display_name=PIPELINE_NAME,  
      job_id=JOB_ID,  
      template_path=FILENAME,  
      pipeline_root=PIPELINE_ROOT,  
      parameter_values={'bq_source_project': BQ_SOURCE_PROJECT,  
                          'bq_source_dataset': BQ_SOURCE_DATASET,  
                          'bq_source_table': BQ_SOURCE_TABLE}  

“”“

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71962260

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档