我有一个气流调度程序和气流网络服务器部署在EC2机器上的AWS.我使用这个气流调度程序来执行带有AwsBatchOperator任务的DAG。此任务执行EC2机器上出现的python脚本。以下是DAG的代码:
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.operators.batch import AwsBatchOperator
default_args = {
'owner': 'admin',
'concurrency': 3,
'depends_on_past': True,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': None,
'end_date': None,
'schedule_interval': None,
}
dag = DAG(
dag_id='my-dag',
default_args=default_args,
description='My DAG',
schedule_interval='00 03 * * *',
start_date=days_ago(1),
tags=['dev'],
)
task = AwsBatchOperator(
dag=dag,
job_name= 'my-job-name',
job_definition= 'arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name',
job_queue= 'arn:aws:batch:eu-central-1:XXXX:job-queue/my-job-name',
region_name= 'eu-central-1',
task_id= 'my-task-id',
overrides={
'command': ['python3', './my_python_script.py']
},
parameters= {}
)python脚本my_python_script.py位于部署气流的EC2机器上,位于/home/ubuntu目录下。
我有一个错误,在这个python脚本中引发了一个错误。我对其进行了更正,并将更正后的脚本推到了EC2机器上。但是,当我执行DAG时,仍然会得到由我更正的错误引起的错误。所以我的问题是:
如何刷新DAG以确保它使用我的EC2机器上的脚本版本?
我试过的
通过单击气流web接口airflow-scheduler
python -m compileall在EC2机器上等待 python脚本。发布于 2021-07-15 16:00:00
要更新AwsBatchOperator 任务的代码,您需要用新代码更新批处理作业使用的停靠映像,而不是部署气流的EC2机器上的代码。
AwsBatchOperator可以执行AWS批处理作业定义码头映像中的代码,但不能执行部署气流的EC2机器中的代码。
使用AwsBatchOperator时,可以设置要使用的作业定义。就我的情况而言,这是arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name。此作业定义包含将执行命令的停靠器映像。请参阅https://docs.aws.amazon.com/en_en/batch/latest/userguide/Batch_GetStarted.html
我感到困惑,因为它是相同的代码基,是存在于EC2机器和码头形象。因此,与其在部署气流的EC2机器上更新代码,我只需用我的代码的新版本创建一个新的码头映像,并使我的AWS批处理作业使用这个新映像。
https://stackoverflow.com/questions/68391291
复制相似问题