我有一个PySpark应用程序,我想用Oozie来调度,使用shell操作。我的submit-application.sh脚本只是初始化一个Python (出现在所有工作节点上),并调用application.py Python应用程序脚本。
application.py脚本是一个PySpark应用程序,它附带了一个本地Python模块,比方说称为foobar,它只是在整个代码中导入和使用。
因此,我有一个类似于以下目录的结构:
.
├── foobar
│ ├── config.py
│ ├── foobar.py
│ └── __init__.py
├── application.DEV.ini
├── application.PROD.ini
├── application.py
├── requirements.txt
└── submit-application.sh我正在尝试使用Oozie工作流来打包所有脚本和本地模块文件,但是显然,它们总是以扁平的形式交付到容器的根目录目录中,而不管我使用了什么配置。这将防止Python脚本加载本地模块,从而导致ModuleNotFoundError: No module named 'foobar'错误。
难道没有办法告诉Oozie将文件工件放在子目录中吗?似乎#符号被忽略了。
这是我的Oozie workflow.xml文件
<workflow-app name="Data-Extraction-WF" xmlns="uri:oozie:workflow:0.5">
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>
<start to="Data-Extraction"/>
<action name="Data-Extraction">
<shell xmlns="uri:oozie:shell-action:1.0">
<exec>submit-application.sh</exec>
<file>app/__init__.py#app/__init__.py</file>
<file>app/config.py#app/config.py</file>
<file>app/foobar.py#app/foobar.py</file>
<file>application.DEV.ini#application.DEV.ini</file>
<file>application.PROD.ini#application.PROD.ini</file>
<file>application.py#application.py</file>
<file>submit-application.sh#submit-application.sh</file>
<capture-output/>
</shell>
<ok to="success"/>
<error to="failure"/>
</action>
<kill name="failure">
<message>Workflow failed, error message: [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="success"/>
</workflow-app>发布于 2022-03-31 09:09:40
最后,我创建了一个包装器脚本,它从HDFS获取文件,并在Oozie工作流中简单地使用该脚本。除了工作流的HDFS位置之外,步骤(子目录)被传递给这个脚本,然后脚本下载整个目录并在其中执行运行脚本。
<workflow-app name="Data-Extraction-WF" xmlns="uri:oozie:workflow:0.5">
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>
<start to="Data-Extraction"/>
<action name="Data-Extraction">
<shell xmlns="uri:oozie:shell-action:1.0">
<exec>execute_workflow_step.sh</exec>
<argument>-w</argument>
<argument>${wf:conf('oozie.wf.application.path')}</argument>
<argument>-s</argument>
<argument>data-transformation</argument>
<file>execute_workflow_step.sh</file>
</shell>
<ok to="success"/>
<error to="failure"/>
</action>
<kill name="failure">
<message>Workflow failed, error message: [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="success"/>
</workflow-app>这是我的execute_workflow_step.sh脚本:它从工作流的HDFS目录下载step目录并执行其运行脚本:
#!/usr/bin/env bash
set -euo pipefail
IFS=$'\n\t'
err_trap() {
echo "*** FAILED: Error on line $1"
exit 1
}
trap 'err_trap $LINENO' ERR
usage() { echo "Usage: $0 [-w <workflow HDFS path>] [-s <step-directory>] [-p <step submit script parameters>]" 1>&2; exit 1; }
PARAMETERS=""
while getopts ":w:s:p:" o; do
case "${o}" in
w)
WORKFLOW_PATH=${OPTARG}
;;
s)
STEP_DIRECTORY=${OPTARG}
;;
p)
PARAMETERS=${OPTARG}
;;
*)
usage
;;
esac
done
shift $((OPTIND-1))
if [ -z "${WORKFLOW_PATH}" ] || [ -z "${STEP_DIRECTORY}" ]; then
usage
fi
HDFS_BASEDIR=$(dirname "${WORKFLOW_PATH}")
WORKFLOW_STEP_DIRECTORY="${HDFS_BASEDIR}/${STEP_DIRECTORY}"
echo "Getting: ${WORKFLOW_STEP_DIRECTORY}"
hdfs dfs -get "${WORKFLOW_STEP_DIRECTORY}"
STEP_SCRIPT="${STEP_DIRECTORY}/submit-application.sh"
chmod 755 "$STEP_SCRIPT"
echo "Step submit script: ${STEP_SCRIPT}"
echo "Parameters: ${PARAMETERS}"
echo "Invoking: ${STEP_SCRIPT} ${PARAMETERS}"
"${STEP_SCRIPT}" "${PARAMETERS}"https://stackoverflow.com/questions/71606959
复制相似问题