我有一个作业A,它需要调用Azkaban流"F“作为依赖项。我如何提及工作A对流程F的依赖?
下面是与获取远程存储的流“F”有关的内容:
session = remote.Session("user@https://AZKABANURL")
workflows = session.get_workflows("FlowFProjectName")
flows = workflows[u"flows"]
flow_id = flows[0]["flowId"]
workflows = session.get_workflow_info("FlowFProjectName", flow_id)
node_id = workflows["nodes"][0]["id"]现在我有了node_id,它是流程F中最后一个作业的名称,我如何在作业A中添加流F的依赖项?是这样的吗?
jobs["A"] = {
"type": "command",
"command": 'echo "Hello World"',
"dependencies": "F"
}做以下操作会让我在上传到Azkaban时出错(通过将这个作业A绑定到一个项目中):
jobs["a"] = Job({"type": "command", "command": 'echo "Hello World"',"dependencies": node_id})
这是一个错误:
azkaban.util.AzkabanError: Installation Failed.
Error found in upload. Cannot upload.
a cannot find dependency <node_id>在这里,node_id是我所模糊的工作的实际名称。
有人能建议我在作业中添加这些对外部流的依赖吗?外部流位于Azkaban上(这就是我必须使用Azkaban.remote的原因)。
发布于 2018-08-30 04:59:42
我找到了我问题的答案:
选项1:这更容易理解--您可以使用while循环不断地询问Azkaban某个特定的作业/流是否仍在运行。但是,在这样做时,您应该让while循环运行几个小时和几个小时,+检查流运行的方式是否使用get_running_workflows()方法。此方法不返回某个流的某个实例是否仍在运行,而是只返回所述流的任何实例是否正在运行。
选项2:如果流程F以作业f结尾,而流程F执行后需要作业A运行,则在流程F的末尾添加一个作业,例如f‘,以便f’调用作业A。
如果这很难理解的话:
原作业图:流程F,作业A所依赖的: f1 -> f2 -> .F
添加启动作业后: Flow f':f1 -> f2 -> .F -> f'
在这里,f‘应该包括一个session.run_workflow(project_A, flow_A)
这是一种比选项1更好的方法,因为您肯定知道,只有在流f成功执行之后才会启动作业A。我希望这对将来的人有帮助。
https://stackoverflow.com/questions/51600048
复制相似问题