我一直在使用AWS Glue工作流来编排批处理作业。为了限制批处理作业的处理,我们需要通过下推谓词。当我们单独运行Glue作业时,我们可以在运行时将下推谓词作为命令行参数传递(即aws粘合开始作业运行--作业名称foo.scala -参数-arg1 1-text${arg1}.)。但是,当我们使用胶水工作流来执行glue作业时,还不太清楚。
当我们使用AWS Glue工作流编排批处理作业时,我们可以在创建工作流时添加run属性。
我试过:
aws胶水启动-工作流-运行-名称工作流-名称: jq -r '.RunId‘ aws胶水put-工作流-运行-属性-名称工作流-名称-运行ID "ID“-运行-属性-下推谓词=”一些值“
我能够看到我使用put-工作流- run -属性传递的run属性。
aws胶水put-工作流-运行-属性-名称工作流-ID "ID“
但我无法在“Glue作业”中检测到“下推谓词”。知道如何在Glue作业中访问工作流的run属性吗?

发布于 2020-01-03 06:07:49
如果您使用python作为Glue作业的编程语言,那么您可以发出属性 API调用来检索属性并在Glue作业中使用它。
response = client.get_workflow_run_properties(
Name='string',
RunId='string'
)这将给您以下响应,您可以解析并使用它:
{
'RunProperties': {
'string': 'string'
}
}如果您正在使用scala,那么您可以使用等效的AWS。
发布于 2021-01-21 17:01:00
首先,您需要确保作业是从工作流中运行的:
def get_worfklow_params(args: Dict[str, str]) -> Dict[str, str]:
"""
get_worfklow_params is delegated to retrieve the WORKFLOW parameters
"""
glue_client = boto3.client("glue")
if "WORKFLOW_NAME" in args and "WORKFLOW_RUN_ID" in args:
workflow_args = glue_client.get_workflow_run_properties(Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID'])["RunProperties"]
print("Found the following workflow args: \n{}".format(workflow_args))
return workflow_args
print("Unable to find run properties for this workflow!")
return None此方法将返回workflow输入参数的映射。
无法使用以下方法来检索给定参数:
def get_worfklow_param(args: Dict[str, str], arg) -> str:
"""
get_worfklow_param is delegated to verify if the given parameter is present in the job and return it. In case of no presence None will be returned
"""
if args is None:
return None
return args[arg] if arg in args else None通过重用代码,我认为最好创建一个python (whl)模块,并在作业的脚本路径中设置模块。通过这种方式,您可以通过简单的导入来检索方法。
没有whl模块,您可以按照以下方式移动:
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
import boto3
import sys
from typing import Dict
def get_worfklow_params(args: Dict[str, str]) -> Dict[str, str]:
"""
get_worfklow_params is delegated to retrieve the WORKFLOW parameters
"""
glue_client = boto3.client("glue")
if "WORKFLOW_NAME" in args and "WORKFLOW_RUN_ID" in args:
workflow_args = glue_client.get_workflow_run_properties(
Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID'])["RunProperties"]
print("Found the following workflow args: \n{}".format(workflow_args))
return workflow_args
print("Unable to find run properties for this workflow!")
return None
def get_worfklow_param(args: Dict[str, str], arg) -> str:
"""
get_worfklow_param is delegated to verify if the given parameter is present in the job and return it. In case of no presence None will be returned
"""
if args is None:
return None
return args[arg] if arg in args else None
_args = getResolvedOptions(sys.argv, ['JOB_NAME', 'WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
worfklow_params = get_worfklow_params(_args)
job_run_id = get_worfklow_param(_args, "WORKFLOW_RUN_ID")
my_parameter= get_worfklow_param(_args, "WORKFLOW_CUSTOM_PARAMETER")发布于 2022-09-07 16:23:29
如果您使用工作流运行Glue作业,那么sys.argv (即列表)将在其中包含参数--WORKFLOW_NAME和--WORKFLOW_RUN_ID。您可以使用此事实检查Glue作业是否由Workflow运行,然后检索Workflow运行时属性
from awsglue.utils import getResolvedOptions
if '--WORKFLOW_NAME' in sys.argv and '--WORKFLOW_RUN_ID' in sys.argv:
glue_args = getResolvedOptions(
sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID']
)
workflow_args = glue_client.get_workflow_run_properties(
Name=glue_args['WORKFLOW_NAME'], RunId=glue_args['WORKFLOW_RUN_ID']
)["RunProperties"]
return {**workflow_args}
else:
raise Exception("GlueJobNotStartedByWorkflow")https://stackoverflow.com/questions/59570537
复制相似问题