我已经将我的训练代码打包成一个python包,然后能够在顶点AI上作为一个定制的培训任务来运行它。现在,我希望能安排这份工作,比如说每两周一次,然后重新训练这个模型。CustomJobSpec中的调度设置只允许两个字段,即“超时值”和"restartJobOnWorkerRestart“,因此不能使用CustomJoBSpec中的调度设置。实现这一目标的一种方法是使用"CustomPythonPackageTrainingJobRunOp“Google管道组件创建一个顶点AI管道,然后按我认为合适的方式调度管道运行。是否有更好的替代办法来实现这一目标?
编辑:
我能够使用Cloud来安排定制的培训工作,但是我发现在AIPlatformClient中使用AIPlatformClient方法很容易在顶点AI管道中使用。我在gcp中使用Cloud计划定制作业的步骤如下,链接到google:
您还需要在您的项目中有一个“服务帐户”,其中包含一个“服务代理角色”。虽然如果在2019年3月19日之后启用了Cloud,那么应该自动设置docs --但对我来说并非如此,必须手动添加服务帐户。
发布于 2021-08-16 14:02:14
根据您的要求,各种可能的调度方法:
1.云编写器
云作曲家是一个托管Apache气流,它帮助您创建、调度、监视和管理工作流。
您可以按照下面提到的步骤,每两周使用Composer来安排您的工作:
with models.DAG(
'composer_sample_bq_notify',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:另外,您也可以使用unix-cron字符串格式(* * *)来执行调度。
也就是说,在每两周调度一次的情况下,cron格式将类似于:* * 1,15 * *
可以使用PythonOperator在操作中传递自定义作业所需的参数。
在写入DAG文件之后,您需要将其推送到Composer Environment桶中的dags/文件夹中。
您可以在气流UI中检查计划中的DAG状态。
计划中的DAG文件如下所示:
sample_dag.py :
from __future__ import print_function
import datetime
from google.cloud import aiplatform
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': YESTERDAY,
}
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:
def create_custom_job_sample(
project: str,
display_name: str,
container_image_uri: str,
location: str,
api_endpoint: str,
):
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(client_options=client_options)
custom_job = {
"display_name": display_name,
"job_spec": {
"worker_pool_specs": [
{
"machine_spec": {
"machine_type": "n1-standard-4",
"accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
"accelerator_count": 1,
},
"replica_count": 1,
"container_spec": {
"image_uri": container_image_uri,
"command": [],
"args": [],
},
}
]
},
}
parent = f"projects/{project}/locations/{location}"
response = client.create_custom_job(parent=parent, custom_job=custom_job)
print("response:", response)
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=create_custom_job_sample,
op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='job scheduled')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash2.云调度程序:使用云调度器调度作业,您需要执行以下配置:
3.使用Kubeflow管道SDK运行 调度循环管道
您可以使用Python和Kubeflow管道SDK调度循环管道运行。
from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID,
region=REGION)
api_client.create_schedule_from_job_spec(
job_spec_path=COMPILED_PIPELINE_PATH,
schedule=* * 1,15 * *,
time_zone=TIME_ZONE,
parameter_values=PIPELINE_PARAMETERS
)https://stackoverflow.com/questions/68793294
复制相似问题