首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何访问Glue作业中AWS Glue工作流的运行属性?

如何访问Glue作业中AWS Glue工作流的运行属性?
EN

Stack Overflow用户
提问于 2020-01-02 21:57:02
回答 3查看 2.9K关注 0票数 1

我一直在使用AWS Glue工作流来编排批处理作业。为了限制批处理作业的处理,我们需要通过下推谓词。当我们单独运行Glue作业时,我们可以在运行时将下推谓词作为命令行参数传递(即aws粘合开始作业运行--作业名称foo.scala -参数-arg1 1-text${arg1}.)。但是,当我们使用胶水工作流来执行glue作业时,还不太清楚。

当我们使用AWS Glue工作流编排批处理作业时,我们可以在创建工作流时添加run属性。

  1. 我可以使用run属性传递我的Glue作业的下推谓词吗?
  2. 如果是,那么如何在运行时为run属性(按下谓词)定义值。我想在运行时为下推谓词定义值的原因是,谓词每天都在任意变化。(即运行胶水-过去10天、过去20天、过去2天等)

我试过:

aws胶水启动-工作流-运行-名称工作流-名称: jq -r '.RunId‘ aws胶水put-工作流-运行-属性-名称工作流-名称-运行ID "ID“-运行-属性-下推谓词=”一些值“

我能够看到我使用put-工作流- run -属性传递的run属性。

aws胶水put-工作流-运行-属性-名称工作流-ID "ID“

但我无法在“Glue作业”中检测到“下推谓词”。知道如何在Glue作业中访问工作流的run属性吗?

EN

回答 3

Stack Overflow用户

发布于 2020-01-03 06:07:49

如果您使用python作为Glue作业的编程语言,那么您可以发出属性 API调用来检索属性并在Glue作业中使用它。

代码语言:javascript
复制
response = client.get_workflow_run_properties(
    Name='string',
    RunId='string'
)

这将给您以下响应,您可以解析并使用它:

代码语言:javascript
复制
{
    'RunProperties': {
        'string': 'string'
    }
}

如果您正在使用scala,那么您可以使用等效的AWS。

票数 1
EN

Stack Overflow用户

发布于 2021-01-21 17:01:00

首先,您需要确保作业是从工作流中运行的:

代码语言:javascript
复制
def get_worfklow_params(args: Dict[str, str]) -> Dict[str, str]:
    """
    get_worfklow_params is delegated to retrieve the WORKFLOW parameters
    """
    glue_client = boto3.client("glue")
    if "WORKFLOW_NAME" in args and "WORKFLOW_RUN_ID" in args:
        workflow_args = glue_client.get_workflow_run_properties(Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID'])["RunProperties"]
        print("Found the following workflow args: \n{}".format(workflow_args))
        return workflow_args
    print("Unable to find run properties for this workflow!")
    return None

此方法将返回workflow输入参数的映射。

无法使用以下方法来检索给定参数:

代码语言:javascript
复制
def get_worfklow_param(args: Dict[str, str], arg) -> str:
    """
    get_worfklow_param is delegated to verify if the given parameter is present in the job and return it. In case of no presence None will be returned
    """
    if args is None:
        return None
    return args[arg] if arg in args else None

通过重用代码,我认为最好创建一个python (whl)模块,并在作业的脚本路径中设置模块。通过这种方式,您可以通过简单的导入来检索方法。

没有whl模块,您可以按照以下方式移动:

代码语言:javascript
复制
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    import boto3
    import sys
    from typing import Dict

    def get_worfklow_params(args: Dict[str, str]) -> Dict[str, str]:
    """
    get_worfklow_params is delegated to retrieve the WORKFLOW parameters
    """
    glue_client = boto3.client("glue")
    if "WORKFLOW_NAME" in args and "WORKFLOW_RUN_ID" in args:
        workflow_args = glue_client.get_workflow_run_properties(
            Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID'])["RunProperties"]
        print("Found the following workflow args: \n{}".format(workflow_args))
        return workflow_args
    print("Unable to find run properties for this workflow!")
    return None

    def get_worfklow_param(args: Dict[str, str], arg) -> str:
    """
    get_worfklow_param is delegated to verify if the given parameter is present in the job and return it. In case of no presence None will be returned
    """
    if args is None:
        return None
    return args[arg] if arg in args else None

    _args = getResolvedOptions(sys.argv, ['JOB_NAME', 'WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
    worfklow_params = get_worfklow_params(_args)


    job_run_id = get_worfklow_param(_args, "WORKFLOW_RUN_ID")
    my_parameter= get_worfklow_param(_args, "WORKFLOW_CUSTOM_PARAMETER")
票数 0
EN

Stack Overflow用户

发布于 2022-09-07 16:23:29

如果您使用工作流运行Glue作业,那么sys.argv (即列表)将在其中包含参数--WORKFLOW_NAME--WORKFLOW_RUN_ID。您可以使用此事实检查Glue作业是否由Workflow运行,然后检索Workflow运行时属性

代码语言:javascript
复制
from awsglue.utils import getResolvedOptions

if '--WORKFLOW_NAME' in sys.argv and '--WORKFLOW_RUN_ID' in sys.argv:
    glue_args = getResolvedOptions(
        sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID']
    )

    workflow_args = glue_client.get_workflow_run_properties(
        Name=glue_args['WORKFLOW_NAME'], RunId=glue_args['WORKFLOW_RUN_ID']
    )["RunProperties"]

    return {**workflow_args}
else:
    raise Exception("GlueJobNotStartedByWorkflow")
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59570537

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档