首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Airflow dag bash操作员权限被拒绝

Airflow dag bash操作员权限被拒绝
EN

Stack Overflow用户
提问于 2019-10-10 23:35:29
回答 1查看 4K关注 0票数 2

我对airflow非常陌生,并且尝试每隔5分钟运行一个ETL进程。我有一个气流dag,我试图安排它每5分钟运行一次,但dag失败了,并显示错误消息-bash命令失败,权限被拒绝。

dag基本上是一个ETL进程,有一个BashOperator (失败)和三个PythonOperators,它们是BashOperator的下游进程。

代码语言:javascript
复制
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator 
from airflow.contrib.sensors.file_sensor import FileSensor

from bin.int_medications import int_meds_auto_updt, storage, insert, del_stag, int_med_stag_clean


DAG_DEFAULT_ARGS = {
'owner':'airflow',
'depends_on_past':False,
'retires':1,
}


dag3 = DAG(dag_id = 'int_meds_dag_v1', 
           start_date=datetime(2019, 10, 10), 
           default_args = DAG_DEFAULT_ARGS,
           schedule_interval = '*/5 * * * *',
           catchup = False)


cmd_command = "/home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"
data_loading = BashOperator(
         task_id = "int_meds",
         bash_command = cmd_command,
         dag=dag3)


data_cleaning = PythonOperator(task_id = 'data_cleaning', python_callable = int_med_stag_clean.clean_stag)
data_insert = PythonOperator(task_id = 'data_insert', python_callable = insert.insert_stag)
data_delete = PythonOperator(task_id = 'data_delete', python_callable = del_stag.delete_stag)

data_loading >> data_cleaning >> data_insert >> data_delete

附件是dag文件的代码,错误消息如下。

代码语言:javascript
复制
*** Reading local file: /home/akash/airflow/logs/int_meds_dag_v1/int_meds/2019-10-10T14:45:00+00:00/1.log
[2019-10-10 10:50:26,649] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,652] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-10 10:50:26,652] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,659] {__init__.py:1374} INFO - Executing <Task(BashOperator): int_meds> on 2019-10-10T14:45:00+00:00
[2019-10-10 10:50:26,659] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'int_meds_dag_v1', 'int_meds', '2019-10-10T14:45:00+00:00', '--job_id', '15495', '--raw', '-sd', 'DAGS_FOLDER/int_med_dag.py', '--cfg_path', '/tmp/tmpenegd6zi']
[2019-10-10 10:50:28,319] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,318] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-10 10:50:28,436] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,436] {__init__.py:305} INFO - Filling up the DagBag from /home/akash/airflow/dags/int_med_dag.py
[2019-10-10 10:50:29,739] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:29,739] {cli.py:517} INFO - Running <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [running]> on host TRLPowerSpec.local
[2019-10-10 10:50:29,751] {bash_operator.py:81} INFO - Tmp dir root location: 
 /tmp
[2019-10-10 10:50:29,751] {bash_operator.py:90} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=int_meds_dag_v1
AIRFLOW_CTX_TASK_ID=int_meds
AIRFLOW_CTX_EXECUTION_DATE=2019-10-10T14:45:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-10T14:45:00+00:00
[2019-10-10 10:50:29,751] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v
[2019-10-10 10:50:29,751] {bash_operator.py:114} INFO - Running command: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py
[2019-10-10 10:50:29,756] {bash_operator.py:123} INFO - Output:
[2019-10-10 10:50:29,757] {bash_operator.py:127} INFO - /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v: line 1: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py: Permission denied
[2019-10-10 10:50:29,757] {bash_operator.py:131} INFO - Command exited with return code 126
[2019-10-10 10:50:29,760] {__init__.py:1580} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:29,761] {__init__.py:1611} INFO - Marking task as FAILED.
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds Traceback (most recent call last):
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/bin/airflow", line 32, in <module>
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     args.func(args)
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     return f(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     _run(args, dag, ti)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     pool=args.pool,
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     return func(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     result = task_copy.execute(context=context)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     raise AirflowException("Bash command failed")
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:31,649] {logging_mixin.py:95} INFO - [2019-10-10 10:50:31,649] {jobs.py:2562} INFO - Task exited with return code 1

我还尝试使用以下命令为python文件授予权限

代码语言:javascript
复制
sudo chmod -R -f 777 /path/to/file

但是,它仍然在气流中抛出了同样的错误。

如果我能知道错误是什么,并能纠正它,我会非常感激的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-10-11 19:33:29

Bash运算符需要bash_command参数中的bash文件(在这种情况下,文件扩展名应为.sh)或Bash命令。尝试将cmd_command替换为:

代码语言:javascript
复制
cmd_command = "python /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"

或者,您也可以改用PythonOperator并从int_meds_auto_updt.py运行代码

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58326499

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档