首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在顶点AI中安排自定义培训作业的重复运行

如何在顶点AI中安排自定义培训作业的重复运行
EN

Stack Overflow用户
提问于 2021-08-15 16:12:43
回答 1查看 3.1K关注 0票数 3

我已经将我的训练代码打包成一个python包,然后能够在顶点AI上作为一个定制的培训任务来运行它。现在,我希望能安排这份工作,比如说每两周一次,然后重新训练这个模型。CustomJobSpec中的调度设置只允许两个字段,即“超时值”和"restartJobOnWorkerRestart“,因此不能使用CustomJoBSpec中的调度设置。实现这一目标的一种方法是使用"CustomPythonPackageTrainingJobRunOp“Google管道组件创建一个顶点AI管道,然后按我认为合适的方式调度管道运行。是否有更好的替代办法来实现这一目标?

编辑:

我能够使用Cloud来安排定制的培训工作,但是我发现在AIPlatformClient中使用AIPlatformClient方法很容易在顶点AI管道中使用。我在gcp中使用Cloud计划定制作业的步骤如下,链接到google:

  1. 将目标类型设置为HTTP
  2. 为了让url指定自定义作业,我按照链接获得url
  3. 对于身份验证,在Auth下,我选择了"Add OAauth令牌“

您还需要在您的项目中有一个“服务帐户”,其中包含一个“服务代理角色”。虽然如果在2019年3月19日之后启用了Cloud,那么应该自动设置docs --但对我来说并非如此,必须手动添加服务帐户。

EN

回答 1

Stack Overflow用户

发布于 2021-08-16 14:02:14

根据您的要求,各种可能的调度方法:

1.云编写器

云作曲家是一个托管Apache气流,它帮助您创建、调度、监视和管理工作流。

您可以按照下面提到的步骤,每两周使用Composer来安排您的工作:

  • 创建一个作曲家环境。
  • 编写一个达格文件,并将您的自定义培训python代码添加到DAG文件中。
  • 由于自定义培训作业是python代码,所以可以使用PythonOperator来调度任务。
  • 在DAG文件中,您需要提供开始时间,即从哪个时间开始调度,您需要将计划间隔定义为两个星期,如下所示:
代码语言:javascript
复制
with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=2),
        default_args=default_dag_args) as dag:

另外,您也可以使用unix-cron字符串格式(* * *)来执行调度。

也就是说,在每两周调度一次的情况下,cron格式将类似于:* * 1,15 * *

可以使用PythonOperator在操作中传递自定义作业所需的参数。

在写入DAG文件之后,您需要将其推送到Composer Environment桶中的dags/文件夹中。

您可以在气流UI中检查计划中的DAG状态。

计划中的DAG文件如下所示:

sample_dag.py :

代码语言:javascript
复制
from __future__ import print_function

import datetime

from google.cloud import aiplatform

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)


default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(weeks=2),
        default_args=default_dag_args) as dag:
    
    def create_custom_job_sample(
    project: str,
    display_name: str,
    container_image_uri: str,
    location: str,
    api_endpoint: str,
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.JobServiceClient(client_options=client_options)
    custom_job = {
        "display_name": display_name,
        "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": "n1-standard-4",
                        "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
                        "accelerator_count": 1,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": container_image_uri,
                        "command": [],
                        "args": [],
                    },
                }
            ]
        },
    }
    parent = f"projects/{project}/locations/{location}"
    response = client.create_custom_job(parent=parent, custom_job=custom_job)
    print("response:", response)
    
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=create_custom_job_sample,
        op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
        )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='job scheduled')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

2.云调度程序:使用云调度器调度作业,您需要执行以下配置:

  • Target : HTTP
  • 作业的端点URL (“us-central1-aiplatform.googleapis.com”)示例:URL )
  • Auth header:Google托管在*.googleapis.com上的OAuth令牌

3.使用Kubeflow管道SDK运行 调度循环管道

您可以使用Python和Kubeflow管道SDK调度循环管道运行。

代码语言:javascript
复制
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID,
                           region=REGION)

api_client.create_schedule_from_job_spec(
    job_spec_path=COMPILED_PIPELINE_PATH,
    schedule=* * 1,15 * *,
    time_zone=TIME_ZONE,
    parameter_values=PIPELINE_PARAMETERS
)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68793294

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档