我有一个简单的工作,如果可能的话,我想在气流过程中移动。现在,我有一串bash脚本,它们访问服务器并下载文件的最新版本,然后对该文件执行各种下游操作。
exec ./somescript.sh somefileurl我想知道的是:当我每次需要运行这个进程时,我如何才能将URL传递给这个文件?
似乎如果我尝试将bash脚本作为bash命令运行,如下所示:
download = BashOperator(
task_id='download_release',
bash_command='somescript.sh',
# params={'URL': 'somefileurl'},
dag=dag)我无法传递bash脚本所需的一个参数。否则,如果我尝试以bash命令的形式发送bash脚本,如下所示:
download = BashOperator(
task_id='download_release',
bash_command='./somescript.sh {{ URL }}',
params={'URL': 'somefileurl'},
dag=dag)当程序尝试在临时目录的上下文中执行脚本时,我收到执行错误。这会破坏脚本,因为它需要访问位于同一目录中的一些凭证文件,并且我希望保持相对文件位置的完整性……
有什么想法?
更新:对我有效的方法
download = BashOperator(
task_id='download_release',
bash_command='cd {{ params.dir }} && ./somescript.sh {{ params.url }}',
params={'url': 'somefileurl',
'dir': 'somedir'},
dag=dag)不过,我还没有实现任何参数传递。
发布于 2017-02-28 23:41:26
下面是一个向BashOperator传递参数的示例:
templated_command = """
cd /working_directory
somescript.sh {{ dag_run.conf['URL'] }}
"""
download = BashOperator(
task_id='download_release',
bash_command=templated_command,
dag=dag)有关这方面的讨论,请参阅passing parameters to externally trigged dag。Airflow有两个示例DAG来演示这一点:example_trigger_controller_dag和example_trigger_target_dag。另请参阅Airflow api reference on macros。
https://stackoverflow.com/questions/42512305
复制相似问题