首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >计算dataproc集群中作业的开始时间和结束时间。

计算dataproc集群中作业的开始时间和结束时间。
EN

Code Review用户
提问于 2022-02-17 16:08:30
回答 2查看 75关注 0票数 2

我有下面的函数get_status_time,它计算火花作业的开始时间和结束时间,它已经完成了它的运行(状态可能失败或通过)。

它是工作的,但功能太复杂;我想微调它,以降低认知复杂性。

代码语言:javascript
复制
def run_sys_command(cmd):
    try:

        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True,
                                shell=True)
        s_output, s_err = proc.communicate()
        s_return = proc.returncode
        return s_return, s_output, s_err

    except Exception as e:
        logger.exception(e,stack_info=True)

def get_status_time(app_name):
    """returns the start and end time of the spark job running on dataproc cluster

    Args:
        app_name (str): application name
        region (str): region name

    Returns:
        list: [status_value, start_time, end_time]
    """
    try:
        end_time = get_today_dt()
        logger.info(f"the end_time is {end_time}")
        app_id = app_name

        cmd = "gcloud dataproc jobs describe {} --region={}".format(app_id, region)

        (ret, out, err) = run_sys_command(cmd)
        logger.info(f"return code :{ret} , out :{out} , err :{err}")

        split_out = out.split("\n")

        logger.info(f"the value of split_out is {split_out}")

        logical_bool,status_value, start_time= False,"UNACCESSED",""

        try:
            matches = split_out.index("  state: ERROR")
            print(f"matches are {matches}")
        except Exception as e:
            matches = 0

        if matches == 0:
            # Grab status
            for line in split_out:
                if logical_bool == False:
                    if "status:" in line:
                        logical_bool = True
                elif logical_bool == True:
                    status_value = line
                    break
        else:
            status_value = "FAILED"

        # Grab start_time
        logical_bool = False
        for line in split_out:
            if logical_bool == False:
                if "state: RUNNING" in line:
                    logical_bool = True
            elif logical_bool == True:
                start_time = line.replace("stateStartTime:", "").strip(" `'\n")
                logger.info(f"START TIME AFTER STRIP: {start_time}")
                start_time = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S")
                break

        status_value = status_value.replace("state:", "").strip()

        if status_value == "DONE":
           status_value = "SUCCEEDED"
        
        return [status_value, start_time, end_time]

    
    except Exception as e:
        logger.error(e, stack_info=True, exc_info=True)


if __name__ == "__main__":
    get_status_time('data-pipeline','us-east4') 
EN

回答 2

Code Review用户

回答已采纳

发布于 2022-02-17 19:49:43

将重复行处理逻辑输出到实用程序函数。当前实现中的大部分复杂性源于以下需求:(1)检查行列表;(2)找到满足某些条件的行;(3)获取下一行。您需要在两个地方这样做,而且两次都试图在常规for-循环的范围内使用布尔标志变量来管理状态。当你发现自己不止一次地做一些复杂的事情时,考虑编写一个函数。即使该函数模仿了当前的方法,简单地将该行为排除在外也是一个值得注意的改进。但是,我认为有一种更直观的方法可以使用时间-真循环和Python的next()函数来实现行为:

代码语言:javascript
复制
def get_line_after(lines, predicate):
    it = iter(lines)
    while True:
        line = next(it, None)
        if line is None:
            return None
        elif predicate(line):
            return next(it, None)

将复杂的解析细节转化为实用程序函数。当前实现中的其他复杂性涉及从子进程调用的输出中解析信息。您可以通过将这些烦人的细节转移到其他地方来简化主要功能。这种策略并没有真正减少代码的数量(稍微增加代码),但是它在两方面增加了清晰度,在后面的几点中详细说明了这一点。

主要功能变得小而清晰。它获得了一个常规的,一步一步的质量。它的工作是将任务委托给其他人并记录结果。其他一些注意事项:(1) region变量在您的代码中没有定义,所以我在这里将它添加到函数签名中;(2)为了简洁起见,我省略了记录调用;(3)限制了try--除了处理无法控制的事情(解析代码通常不满足该测试);(4)您的代码不清楚如何响应子进程调用的失败;(5)由于我们没有您的数据,所以我们无法运行代码,所以我的代码建议中可能会出现排印或错误。

代码语言:javascript
复制
def get_status_time(app_name, region):
    end_time = get_today_dt()

    cmd = 'gcloud dataproc jobs describe {} --region={}'.format(app_name, region)
    try:
        ret, out, err = run_sys_command(cmd)
    except Exception as e:
        # Return, raise, or have run_sys_command() log its own exception
        # and then return data, letting callers decide what to do rather
        # than requiring them to handle exceptions as well.
        return

    lines = out.split('\n')
    status_value = get_status_value(lines)
    start_time = get_start_time(lines)

    return [status_value, start_time, end_time]

效用函数的关注度越来越高。尽管实用程序函数仍然有些繁琐,但至少它们只关注问题的一些非常狭窄的部分。它们也很容易独立于程序的其他机器进行单元测试和调试。最后,由于它们的关注点较窄,它们往往需要生成较少的中间变量名称:相反,它们的工作只是返回一个答案。在大多数情况下,我不会在这些函数中进行任何日志记录。您可以做的另一个改进是将一些神奇的值(例如状态值和日期时间格式)转换为指定的常量。

代码语言:javascript
复制
def get_status_value(lines):
    if '  state: ERROR' in lines:
        return 'FAILED'
    else:
        predicate = lambda line: 'status:' in line
        line = get_line_after(lines, predicate)
        if line is None:
            return 'UNACCESSED'
        else:
            sval = line.replace('state:', '').strip()
            return 'SUCCEEDED' if sval == 'DONE' else sval

def get_start_time(lines):
    predicate = lambda line: 'state: RUNNING' in line
    line = get_line_after(lines, predicate)
    if line is None:
        return ''
    else:
        line = line.replace('stateStartTime:', "").strip(" `'\n")
        dt = datetime.strptime(start_time, '%Y-%m-%dT%H:%M:%S.%fZ')
        return dt.strftime('%Y-%m-%d %H:%M:%S')
票数 2
EN

Code Review用户

发布于 2022-02-18 02:14:26

它是工作的,但功能太复杂;我想微调它,以降低认知复杂性。

为了减少认知负荷,您已经做了一件最重要的事情--您编写了一个docstring来描述您的函数接受和返回的内容。使用docstring,我不需要读取您的函数。如果我不需要读你的函数,我不在乎你的函数有多复杂。

也就是说,其他评论也有很好的建议!

票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/274203

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档