首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >hashicorp库自动解封

hashicorp库自动解封
EN

Code Review用户
提问于 2018-01-02 20:08:58
回答 2查看 1.6K关注 0票数 7

作为一名开发人员,我在我的生活中自然遇到过很多次python,但我不记得用python编写过任何程序。所以我肯定我的蟒蛇不是很惯用。让我们修好它!

这是一个自动解封哈希科普库的脚本,它的目的是从一个码头容器中运行,但是可以单独运行。它不接受任何命令行参数,而是从环境变量(向坞容器传递配置的一种常见方法)或来自配置文件的输入(我选择使用json格式)输入。

源代码可以找到这里并如下所示。

这三项投入是:

  • 你想要自动解封的金库的网址(例如https://vault.rocks)
  • 如果金库被密封,则在检查之间休眠的秒数。
  • 解封键列表(您可能需要一个键,因为如果您不关心手动解封,则不太可能创建多个键)

此外,还可以指定一个环境变量来启用启用更详细输出的调试模式。

代码语言:javascript
复制
import os
import sys
import json
import time
import datetime
from pprint import pprint
import requests

version = "0.1"
envPrefix = "VU_"
config = "vault-unseal.json"

def PrintWithTimestamp(string):
  timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
  sys.stdout.write(f"[{timestamp}] {string}\n")

def PrintDebug(string):
  if "VU_DEBUG" in os.environ:
    PrintWithTimestamp(f"DEBUG: {string}")

def PrintParameterError(name):
  PrintWithTimestamp(f"Error: {name} is not specified. Specify it either in {config} file, or as enviroment variable prefixed with {envPrefix}")

def NormalizePrefix(name, string):
  return name + string[len(envPrefix+name):]

def ReadSingleSetting(name):
  result = None
  try:
    with open(config) as data_file:    
      data = json.load(data_file)
    result = data[name]
  except (FileNotFoundError, KeyError):
    pass
  if envPrefix+name in os.environ:
    result = os.environ[envPrefix+name]
  return result


def ReadMultiSetting(name):
  result = None
  try:
    with open(config) as data_file:    
      data = json.load(data_file)
    result = { key:data[key] for key in filter(lambda x: x.startswith(name), data.keys()) }
  except (FileNotFoundError):
    pass

  result = result if result else {}

  prefix = envPrefix + name
  envResult = { NormalizePrefix(name,key):os.environ[key] for key in filter(lambda x: x.lower().startswith(prefix.lower()), list(os.environ.keys())) }

  finalResult = list({**result,**envResult}.values())
  finalResult = finalResult if len(finalResult) else None
  return finalResult

def ReadSetting(name):
  if name.endswith("*"):
    return ReadMultiSetting(name[0:-1])
  else:
    return ReadSingleSetting(name)   

PrintWithTimestamp(f"vault-unseal.py version {version}")

if "VU_DEBUG" in os.environ:
  PrintDebug("Dumping environment block:")
  pprint(dict(os.environ))

addressUrl = ReadSetting("ADDRESS_URL")
timeIntervalSeconds = int(ReadSetting("TIME_INTERVAL_SECONDS"))
unsealKeys = ReadSetting("UNSEAL_KEY_*")

if not addressUrl:
  PrintParameterError("ADDRESS_URL")
  sys.exit(1)

if not timeIntervalSeconds:
  PrintParameterError("TIME_INTERVAL_SECONDS")
  sys.exit(1)

if not unsealKeys:
  PrintParameterError("UNSEAL_KEY_*")
  sys.exit(1)

PrintWithTimestamp(f"ADDRESS_URL = {addressUrl}")
PrintWithTimestamp(f"TIME_INTERVAL_SECONDS = {timeIntervalSeconds}")
PrintWithTimestamp("Number of unseal keys: " + str(len(unsealKeys)))

PrintDebug("UNSEAL_KEYS:")
for key in unsealKeys:
  PrintDebug(f"- {key}")
PrintWithTimestamp("If you do not see any output below, it means that the vault is contacted successfully and its unsealed")
PrintWithTimestamp(f"Vault will be contacted every {timeIntervalSeconds} seconds")
PrintWithTimestamp("Run with environment variable VU_DEBUG set to 1 for debug output")

while True:
  try:
    r = requests.get(f"{addressUrl}/v1/sys/seal-status").json()
    PrintDebug(f"status:{r}")
    if "sealed" in r:
      if r["sealed"] == True:
        PrintWithTimestamp("Detected sealed vault. Unsealing...")
        for key in unsealKeys:
          PrintDebug(f"key:{key}")
          r = requests.put(f"{addressUrl}/v1/sys/unseal", json = {"key":key}).json()
          PrintDebug(f"unseal:{r}")
        if r["sealed"] == True:
          PrintWithTimestamp("something went wrong, failed to unseal. Check the keys")
          PrintWithTimestamp(r)
          sys.exit(2)
        else:
          PrintWithTimestamp("Unsealed successfully")
    else:
      PrintWithTimestamp("Error: cannot find 'sealed' in returned json")
      pprint(r)
  except Exception as e:
    PrintWithTimestamp(f"Exception:{e}")
    PrintWithTimestamp(type(e))
    pprint(vars(e))
  time.sleep(timeIntervalSeconds)  
EN

回答 2

Code Review用户

回答已采纳

发布于 2018-01-03 10:59:11

我将尽量不重复@Josay的回答的大部分内容,因为它是在我使用您的代码时编写的。但要点仍然适用:

  • logging模块是你的朋友
  • 遵循PEP8惯例
  • 不要使用== True比较真值
  • 不要重复你自己

谈到logging模块,我通过删除pprint的使用简化了它的大部分使用。但是如果你愿意的话,你可以把它加回去,这是一些食谱。但是,请记住,使用像logger.debug这样的日志记录函数是为了最小化构建消息字符串所需的运行时,因此在这种上下文中使用f-字符串(否则会非常好)将导致在“非调试”运行时产生不必要的开销。哦,顺便说一句,sys.stdout.write('…\n')更好地理解为print('…')

您还碰巧打开、读取和解析配置文件三次(每次设置一次),这是浪费资源。读取它一次,使用它的值更新os.environ字典并在结果字典中搜索您的设置。

当你试图打开保险库的时候也会发生同样的事情。你使用每个钥匙,然后,检查保险库是否已成功打开。一旦金库被打开,您可以在每个密钥之后检查以避免发送无关的请求。您还可以使用for .. else构造来检查是否没有密钥打开金库。

拟议改进:

代码语言:javascript
复制
import os
import sys
import json
import time
import logging

import requests


VERSION = '0.1'
ENV_PREFIX = 'VU_'
CONFIG_FILENAME = 'vault-unseal.json'


def configure_logger():
    logging.basicConfig(stream=sys.stdout, format='[%(asctime)s] %(message)s')
    logger = logging.getLogger('vault-unseal')
    logger.setLevel(logging.DEBUG if 'VU_DEBUG' in os.environ else logging.INFO)
    return logger


def read_configuration_file(filename=CONFIG_FILENAME):
    try:
        with open(filename) as f:
            return json.load(f)
    except (OSError, ValueError):
        return {}


def read_setting(setting_name, parameters, logger):
    if setting_name.endswith('*'):
        prefix = setting_name[:-1]
        setting = {key: value for key, value in parameters.items() if key.startswith(prefix)}
    else:
        setting = parameters.get(setting_name)

    if not setting:
        message = 'Error: %s is not specified. Specify it either in %s or as environment variable prefixed with %s'
        logger.error(message, setting_name, CONFIG_FILENAME, ENV_PREFIX)
        sys.exit(1)

    return setting


def unseal(base_url, time_interval, unseal_keys, logger):
    url = f'{base_url}/v1/sys/seal-status'
    unseal_url = f'{base_url}/v1/sys/unseal'

    while True:
        try:
            r = requests.get(url).json()
            logger.debug('status: %s', r)

            try:
                sealed = r['sealed']
            except KeyError:
                logger.error('Error: cannot find \'sealed\' in returned JSON\n%s', r)
            else:
                if sealed:
                    logger.info('Detected sealed vault. Unsealing…')

                    for key_name, key_value in unseal_keys.items():
                        logger.debug('Using key %s (%s)', key_name, key_value)
                        r = requests.put(unseal_url, json={'key': key_value}).json()
                        if r['sealed']:
                            logger.debug('Unseal result: %s', r)
                        else:
                            logger.info('Unsealed successfully')
                            break
                    else:
                        logger.error('Something went wrong, failed to unseal. Check the keys.\n%s', r)
                        sys.exit(2)
        except Exception:
            logger.exception('An exception occured:')
        time.sleep(time_interval)


def main():
    logger = configure_logger()
    logger.info('vault-unseal.py version %s', VERSION)
    logger.debug('Dumping environment block: \n%s', os.environ)

    # Filter environment variables of interest
    settings = {key[len(ENV_PREFIX):]: value for key, value in os.environ.items() if key.startswith(ENV_PREFIX)}
    # Update values with those found in configuration file
    settings.update(read_configuration_file())

    # Retrieve required parameters
    address_url = read_setting('ADDRESS_URL', settings, logger)
    time_interval = int(read_setting('TIME_INTERVAL_SECONDS', settings, logger))
    unseal_keys = read_setting('UNSEAL_KEY_*', settings, logger)

    logger.info('ADDRESS_URL = %s', address_url)
    logger.info('TIME_INTERVAL_SECONDS = %d', time_interval)
    logger.info('Number of unseal keys: %d', len(unseal_keys))
    logger.debug('UNSEAL_KEYS:')
    for key, value in unseal_keys.items():
        logger.debug('- %s: %s', key, value)

    logger.info('If you do not see any output below, it means that the vault is contacted successfully and its unsealed')
    logger.info('Vault will be contacted every %d seconds', time_interval)
    logger.info('Run with environment variable VU_DEBUG set for debug output')

    unseal(address_url, time_interval, unseal_keys, logger)


if __name__ == '__main__':
    main()
票数 4
EN

Code Review用户

发布于 2018-01-03 10:06:35

风格

您的代码不遵循(Python 8)。在其他方面:

  • 您应该使用4空格缩进。
  • 变量名应该使用snake_case
  • 常量名称应该是UPPER_CASE
  • 您应该删除尾随的空格。

我认为PrintXXX函数可以有较短的名称,如logdebug。此外,它还可以使用logging模块

另外,在if __name__ == "__main__":守卫后面移动代码是一个很好的习惯。

最后,最好有一些关于模块和不同功能的文档。

在这个阶段,代码看起来是这样的(我还没有测试它)。

代码语言:javascript
复制
import os
import sys
import json
import time
import datetime
from pprint import pprint
import requests

VERSION = "0.1"
ENV_PREFIX = "VU_"
CONFIG_FILE = "vault-unseal.json"


def debug_is_enabled():
    return ENV_PREFIX + "DEBUG" in os.environ


def log(string):
    timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
    sys.stdout.write(f"[{timestamp}] {string}\n")


def debug(string):
    if debug_is_enabled():
        log(f"DEBUG: {string}")


def print_param_error(name):
    log(f"Error: {name} is not specified. Specify it either in {CONFIG_FILE} file, or as enviroment variable prefixed with {ENV_PREFIX}")


def normalize_prefix(name, string):
    return name + string[len(ENV_PREFIX+name):]


def read_single_setting(name):
    result = None
    try:
        with open(CONFIG_FILE) as data_file:
            data = json.load(data_file)
        result = data[name]
    except (FileNotFoundError, KeyError):
        pass
    if ENV_PREFIX+name in os.environ:
        result = os.environ[ENV_PREFIX+name]
    return result


def read_multi_setting(name):
    result = None
    try:
        with open(CONFIG_FILE) as data_file:
            data = json.load(data_file)
        result = { key:data[key] for key in filter(lambda x: x.startswith(name), data.keys()) }
    except (FileNotFoundError):
        pass

    result = result if result else {}

    prefix = ENV_PREFIX + name
    env_result = { normalize_prefix(name,key):os.environ[key] for key in filter(lambda x: x.lower().startswith(prefix.lower()), list(os.environ.keys())) }

    final_result = list({**result,**env_result}.values())
    final_result = final_result if len(final_result) else None
    return final_result


def read_setting(name):
    if name.endswith("*"):
        return read_multi_setting(name[0:-1])
    else:
        return read_single_setting(name)



if __name__ == "__main__":
    log(f"vault-unseal.py VERSION {VERSION}")

    if debug_is_enabled():
        debug("Dumping environment block:")
        pprint(dict(os.environ))

    addressUrl = read_setting("ADDRESS_URL")
    timeIntervalSeconds = int(read_setting("TIME_INTERVAL_SECONDS"))
    unseal_keys = read_setting("UNSEAL_KEY_*")

    if not addressUrl:
        print_param_error("ADDRESS_URL")
        sys.exit(1)

    if not timeIntervalSeconds:
        print_param_error("TIME_INTERVAL_SECONDS")
        sys.exit(1)

    if not unseal_keys:
        print_param_error("UNSEAL_KEY_*")
        sys.exit(1)

    log(f"ADDRESS_URL = {addressUrl}")
    log(f"TIME_INTERVAL_SECONDS = {timeIntervalSeconds}")
    log("Number of unseal keys: " + str(len(unseal_keys)))

    debug("UNSEAL_KEYS:")
    for key in unseal_keys:
        debug(f"- {key}")
    log("If you do not see any output below, it means that the vault is contacted successfully and its unsealed")
    log(f"Vault will be contacted every {timeIntervalSeconds} seconds")
    log("Run with environment variable VU_DEBUG set to 1 for debug output")

    while True:
        try:
            r = requests.get(f"{addressUrl}/v1/sys/seal-status").json()
            debug(f"status:{r}")
            if "sealed" in r:
                if r["sealed"] == True:
                    log("Detected sealed vault. Unsealing...")
                    for key in unseal_keys:
                        debug(f"key:{key}")
                        r = requests.put(f"{addressUrl}/v1/sys/unseal", json = {"key":key}).json()
                        debug(f"unseal:{r}")
                    if r["sealed"] == True:
                        log("something went wrong, failed to unseal. Check the keys")
                        log(r)
                        sys.exit(2)
                    else:
                        log("Unsealed successfully")
            else:
                log("Error: cannot find 'sealed' in returned json")
                pprint(r)
        except Exception as e:
            log(f"Exception:{e}")
            log(type(e))
            pprint(vars(e))
        time.sleep(timeIntervalSeconds)

不要重复你自己

检查if "VU_DEBUG" in os.environ出现在多个地方。也许将它移到debug_is_enabled()函数中会更容易。此外,这还可以像代码的其他部分一样重用ENV_PREFIX常量:

代码语言:javascript
复制
def debug_is_enabled():
    return ENV_PREFIX + "DEBUG" in os.environ

选择一个明智的默认值

read_multi_setting中,您有:

代码语言:javascript
复制
result = None
// code_assigning_or_not_a_new_dict_to_result
result = result if result else {}

最后一行只在仍然是result的情况下更改None。这样写起来就更直接了:

代码语言:javascript
复制
result = {}
// code_assigning_or_not_a_new_dict_to_result

不执行无用的操作

read_single_setting中,您解析一个文件以获得一个将被环境变量覆盖的值。也许,只有在没有发现任何情况下,才能更容易地执行环境检查并解析该文件。

我会写一些东西,比如:

代码语言:javascript
复制
def read_single_setting(name):
    if ENV_PREFIX+name in os.environ:
        return os.environ[ENV_PREFIX+name]
    try:
        with open(CONFIG_FILE) as data_file:
            data = json.load(data_file)
        return data[name]
    except (FileNotFoundError, KeyError):
        pass
    return None

设置检查

检索3种不同的设置,并检查是否为false。我不确定这是否有意这样做,但对于timeIntervalSeconds,在检查之前执行到int的转换。我可能会在检查后执行转换(同时,休眠0秒钟似乎是有效的)。

一致返回值

read_multi_setting中,final_result最初是一个列表。然后,有:

代码语言:javascript
复制
final_result = final_result if len(final_result) else None

这让我觉得很困惑。现在,final_result不是一直是一个列表,不是空的,也不是空的,现在不是非空的列表,就是没有的列表。它使函数更难正确地使用,同时也使其更加复杂。这可以简单地说:

代码语言:javascript
复制
return list({**result,**env_result}.values())

简化dict理解

您可以在循环之前调用prefix.lower()

您可以在理解过程中使用if语法,以避免调用filter

在这一阶段,你们有:

代码语言:javascript
复制
prefix = (ENV_PREFIX + name).lower()
env_result = { normalize_prefix(name,key):os.environ[key] for key in list(os.environ.keys()) if key.lower().startswith(prefix) }

据我所知,您不需要转换到list。此外,您还可以使用items()直接迭代键和值:

代码语言:javascript
复制
env_result = { normalize_prefix(name, key): value for key, value in os.environ.items() if key.lower().startswith(prefix) }

与真

的比较

您不需要编写,if value == True:,您可以简单地编写if value:

使用get从dict获得默认值

的值。

与其检查一个值是否在dict中,然后得到该值,您还可以一次完成这两个操作。

代码语言:javascript
复制
        if "sealed" in r:
            if r["sealed"]:

        if r.get("sealed", None):

在此阶段,代码如下所示:

代码语言:javascript
复制
import os
import sys
import json
import time
import datetime
from pprint import pprint
import requests

VERSION = "0.1"
ENV_PREFIX = "VU_"
CONFIG_FILE = "vault-unseal.json"


def debug_is_enabled():
    return ENV_PREFIX + "DEBUG" in os.environ


def log(string):
    timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
    sys.stdout.write(f"[{timestamp}] {string}\n")


def debug(string):
    if debug_is_enabled():
        log(f"DEBUG: {string}")


def print_param_error(name):
    log(f"Error: {name} is not specified. Specify it either in {CONFIG_FILE} file, or as enviroment variable prefixed with {ENV_PREFIX}")


def normalize_prefix(name, string):
    return name + string[len(ENV_PREFIX+name):]


def read_single_setting(name):
    if ENV_PREFIX+name in os.environ:
        return os.environ[ENV_PREFIX+name]
    try:
        with open(CONFIG_FILE) as data_file:
            data = json.load(data_file)
        return data[name]
    except (FileNotFoundError, KeyError):
        pass
    return None


def read_multi_setting(name):
    result = {}
    try:
        with open(CONFIG_FILE) as data_file:
            data = json.load(data_file)
        result = { key:data[key] for key in filter(lambda x: x.startswith(name), data.keys()) }
    except (FileNotFoundError):
        pass

    prefix = (ENV_PREFIX + name).lower()
    env_result = { normalize_prefix(name, key): value for key, value in os.environ.items() if key.lower().startswith(prefix) }

    return list({**result,**env_result}.values())


def read_setting(name):
    if name.endswith("*"):
        return read_multi_setting(name[0:-1])
    else:
        return read_single_setting(name)



if __name__ == "__main__":
    log(f"vault-unseal.py VERSION {VERSION}")

    if debug_is_enabled():
        debug("Dumping environment block:")
        pprint(dict(os.environ))

    addressUrl = read_setting("ADDRESS_URL")
    timeIntervalSeconds = int(read_setting("TIME_INTERVAL_SECONDS"))
    unseal_keys = read_setting("UNSEAL_KEY_*")

    if not addressUrl:
        print_param_error("ADDRESS_URL")
        sys.exit(1)

    if not timeIntervalSeconds:
        print_param_error("TIME_INTERVAL_SECONDS")
        sys.exit(1)

    if not unseal_keys:
        print_param_error("UNSEAL_KEY_*")
        sys.exit(1)

    log(f"ADDRESS_URL = {addressUrl}")
    log(f"TIME_INTERVAL_SECONDS = {timeIntervalSeconds}")
    log("Number of unseal keys: " + str(len(unseal_keys)))

    debug("UNSEAL_KEYS:")
    for key in unseal_keys:
        debug(f"- {key}")
    log("If you do not see any output below, it means that the vault is contacted successfully and its unsealed")
    log(f"Vault will be contacted every {timeIntervalSeconds} seconds")
    log("Run with environment variable VU_DEBUG set to 1 for debug output")

    while True:
        try:
            r = requests.get(f"{addressUrl}/v1/sys/seal-status").json()
            debug(f"status:{r}")
            if r.get("sealed", None):
                log("Detected sealed vault. Unsealing...")
                for key in unseal_keys:
                    debug(f"key:{key}")
                    r = requests.put(f"{addressUrl}/v1/sys/unseal", json = {"key":key}).json()
                    debug(f"unseal:{r}")
                if r["sealed"]:
                    log("something went wrong, failed to unseal. Check the keys")
                    log(r)
                    sys.exit(2)
                else:
                    log("Unsealed successfully")
            else:
                log("Error: cannot find 'sealed' in returned json")
                pprint(r)
        except Exception as e:
            log(f"Exception:{e}")
            log(type(e))
            pprint(vars(e))
        time.sleep(timeIntervalSeconds)
票数 4
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/184122

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档