作为一名开发人员,我在我的生活中自然遇到过很多次python,但我不记得用python编写过任何程序。所以我肯定我的蟒蛇不是很惯用。让我们修好它!
这是一个自动解封哈希科普库的脚本,它的目的是从一个码头容器中运行,但是可以单独运行。它不接受任何命令行参数,而是从环境变量(向坞容器传递配置的一种常见方法)或来自配置文件的输入(我选择使用json格式)输入。
源代码可以找到这里并如下所示。
这三项投入是:
此外,还可以指定一个环境变量来启用启用更详细输出的调试模式。
import os
import sys
import json
import time
import datetime
from pprint import pprint
import requests
version = "0.1"
envPrefix = "VU_"
config = "vault-unseal.json"
def PrintWithTimestamp(string):
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
sys.stdout.write(f"[{timestamp}] {string}\n")
def PrintDebug(string):
if "VU_DEBUG" in os.environ:
PrintWithTimestamp(f"DEBUG: {string}")
def PrintParameterError(name):
PrintWithTimestamp(f"Error: {name} is not specified. Specify it either in {config} file, or as enviroment variable prefixed with {envPrefix}")
def NormalizePrefix(name, string):
return name + string[len(envPrefix+name):]
def ReadSingleSetting(name):
result = None
try:
with open(config) as data_file:
data = json.load(data_file)
result = data[name]
except (FileNotFoundError, KeyError):
pass
if envPrefix+name in os.environ:
result = os.environ[envPrefix+name]
return result
def ReadMultiSetting(name):
result = None
try:
with open(config) as data_file:
data = json.load(data_file)
result = { key:data[key] for key in filter(lambda x: x.startswith(name), data.keys()) }
except (FileNotFoundError):
pass
result = result if result else {}
prefix = envPrefix + name
envResult = { NormalizePrefix(name,key):os.environ[key] for key in filter(lambda x: x.lower().startswith(prefix.lower()), list(os.environ.keys())) }
finalResult = list({**result,**envResult}.values())
finalResult = finalResult if len(finalResult) else None
return finalResult
def ReadSetting(name):
if name.endswith("*"):
return ReadMultiSetting(name[0:-1])
else:
return ReadSingleSetting(name)
PrintWithTimestamp(f"vault-unseal.py version {version}")
if "VU_DEBUG" in os.environ:
PrintDebug("Dumping environment block:")
pprint(dict(os.environ))
addressUrl = ReadSetting("ADDRESS_URL")
timeIntervalSeconds = int(ReadSetting("TIME_INTERVAL_SECONDS"))
unsealKeys = ReadSetting("UNSEAL_KEY_*")
if not addressUrl:
PrintParameterError("ADDRESS_URL")
sys.exit(1)
if not timeIntervalSeconds:
PrintParameterError("TIME_INTERVAL_SECONDS")
sys.exit(1)
if not unsealKeys:
PrintParameterError("UNSEAL_KEY_*")
sys.exit(1)
PrintWithTimestamp(f"ADDRESS_URL = {addressUrl}")
PrintWithTimestamp(f"TIME_INTERVAL_SECONDS = {timeIntervalSeconds}")
PrintWithTimestamp("Number of unseal keys: " + str(len(unsealKeys)))
PrintDebug("UNSEAL_KEYS:")
for key in unsealKeys:
PrintDebug(f"- {key}")
PrintWithTimestamp("If you do not see any output below, it means that the vault is contacted successfully and its unsealed")
PrintWithTimestamp(f"Vault will be contacted every {timeIntervalSeconds} seconds")
PrintWithTimestamp("Run with environment variable VU_DEBUG set to 1 for debug output")
while True:
try:
r = requests.get(f"{addressUrl}/v1/sys/seal-status").json()
PrintDebug(f"status:{r}")
if "sealed" in r:
if r["sealed"] == True:
PrintWithTimestamp("Detected sealed vault. Unsealing...")
for key in unsealKeys:
PrintDebug(f"key:{key}")
r = requests.put(f"{addressUrl}/v1/sys/unseal", json = {"key":key}).json()
PrintDebug(f"unseal:{r}")
if r["sealed"] == True:
PrintWithTimestamp("something went wrong, failed to unseal. Check the keys")
PrintWithTimestamp(r)
sys.exit(2)
else:
PrintWithTimestamp("Unsealed successfully")
else:
PrintWithTimestamp("Error: cannot find 'sealed' in returned json")
pprint(r)
except Exception as e:
PrintWithTimestamp(f"Exception:{e}")
PrintWithTimestamp(type(e))
pprint(vars(e))
time.sleep(timeIntervalSeconds) 发布于 2018-01-03 10:59:11
我将尽量不重复@Josay的回答的大部分内容,因为它是在我使用您的代码时编写的。但要点仍然适用:
logging模块是你的朋友== True比较真值谈到logging模块,我通过删除pprint的使用简化了它的大部分使用。但是如果你愿意的话,你可以把它加回去,这是一些食谱。但是,请记住,使用像logger.debug这样的日志记录函数是为了最小化构建消息字符串所需的运行时,因此在这种上下文中使用f-字符串(否则会非常好)将导致在“非调试”运行时产生不必要的开销。哦,顺便说一句,sys.stdout.write('…\n')更好地理解为print('…')。
您还碰巧打开、读取和解析配置文件三次(每次设置一次),这是浪费资源。读取它一次,使用它的值更新os.environ字典并在结果字典中搜索您的设置。
当你试图打开保险库的时候也会发生同样的事情。你使用每个钥匙,然后,检查保险库是否已成功打开。一旦金库被打开,您可以在每个密钥之后检查以避免发送无关的请求。您还可以使用for .. else构造来检查是否没有密钥打开金库。
拟议改进:
import os
import sys
import json
import time
import logging
import requests
VERSION = '0.1'
ENV_PREFIX = 'VU_'
CONFIG_FILENAME = 'vault-unseal.json'
def configure_logger():
logging.basicConfig(stream=sys.stdout, format='[%(asctime)s] %(message)s')
logger = logging.getLogger('vault-unseal')
logger.setLevel(logging.DEBUG if 'VU_DEBUG' in os.environ else logging.INFO)
return logger
def read_configuration_file(filename=CONFIG_FILENAME):
try:
with open(filename) as f:
return json.load(f)
except (OSError, ValueError):
return {}
def read_setting(setting_name, parameters, logger):
if setting_name.endswith('*'):
prefix = setting_name[:-1]
setting = {key: value for key, value in parameters.items() if key.startswith(prefix)}
else:
setting = parameters.get(setting_name)
if not setting:
message = 'Error: %s is not specified. Specify it either in %s or as environment variable prefixed with %s'
logger.error(message, setting_name, CONFIG_FILENAME, ENV_PREFIX)
sys.exit(1)
return setting
def unseal(base_url, time_interval, unseal_keys, logger):
url = f'{base_url}/v1/sys/seal-status'
unseal_url = f'{base_url}/v1/sys/unseal'
while True:
try:
r = requests.get(url).json()
logger.debug('status: %s', r)
try:
sealed = r['sealed']
except KeyError:
logger.error('Error: cannot find \'sealed\' in returned JSON\n%s', r)
else:
if sealed:
logger.info('Detected sealed vault. Unsealing…')
for key_name, key_value in unseal_keys.items():
logger.debug('Using key %s (%s)', key_name, key_value)
r = requests.put(unseal_url, json={'key': key_value}).json()
if r['sealed']:
logger.debug('Unseal result: %s', r)
else:
logger.info('Unsealed successfully')
break
else:
logger.error('Something went wrong, failed to unseal. Check the keys.\n%s', r)
sys.exit(2)
except Exception:
logger.exception('An exception occured:')
time.sleep(time_interval)
def main():
logger = configure_logger()
logger.info('vault-unseal.py version %s', VERSION)
logger.debug('Dumping environment block: \n%s', os.environ)
# Filter environment variables of interest
settings = {key[len(ENV_PREFIX):]: value for key, value in os.environ.items() if key.startswith(ENV_PREFIX)}
# Update values with those found in configuration file
settings.update(read_configuration_file())
# Retrieve required parameters
address_url = read_setting('ADDRESS_URL', settings, logger)
time_interval = int(read_setting('TIME_INTERVAL_SECONDS', settings, logger))
unseal_keys = read_setting('UNSEAL_KEY_*', settings, logger)
logger.info('ADDRESS_URL = %s', address_url)
logger.info('TIME_INTERVAL_SECONDS = %d', time_interval)
logger.info('Number of unseal keys: %d', len(unseal_keys))
logger.debug('UNSEAL_KEYS:')
for key, value in unseal_keys.items():
logger.debug('- %s: %s', key, value)
logger.info('If you do not see any output below, it means that the vault is contacted successfully and its unsealed')
logger.info('Vault will be contacted every %d seconds', time_interval)
logger.info('Run with environment variable VU_DEBUG set for debug output')
unseal(address_url, time_interval, unseal_keys, logger)
if __name__ == '__main__':
main()发布于 2018-01-03 10:06:35
您的代码不遵循(Python 8)。在其他方面:
snake_caseUPPER_CASE我认为PrintXXX函数可以有较短的名称,如log和debug。此外,它还可以使用logging模块。
另外,在if __name__ == "__main__":守卫后面移动代码是一个很好的习惯。
最后,最好有一些关于模块和不同功能的文档。
在这个阶段,代码看起来是这样的(我还没有测试它)。
import os
import sys
import json
import time
import datetime
from pprint import pprint
import requests
VERSION = "0.1"
ENV_PREFIX = "VU_"
CONFIG_FILE = "vault-unseal.json"
def debug_is_enabled():
return ENV_PREFIX + "DEBUG" in os.environ
def log(string):
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
sys.stdout.write(f"[{timestamp}] {string}\n")
def debug(string):
if debug_is_enabled():
log(f"DEBUG: {string}")
def print_param_error(name):
log(f"Error: {name} is not specified. Specify it either in {CONFIG_FILE} file, or as enviroment variable prefixed with {ENV_PREFIX}")
def normalize_prefix(name, string):
return name + string[len(ENV_PREFIX+name):]
def read_single_setting(name):
result = None
try:
with open(CONFIG_FILE) as data_file:
data = json.load(data_file)
result = data[name]
except (FileNotFoundError, KeyError):
pass
if ENV_PREFIX+name in os.environ:
result = os.environ[ENV_PREFIX+name]
return result
def read_multi_setting(name):
result = None
try:
with open(CONFIG_FILE) as data_file:
data = json.load(data_file)
result = { key:data[key] for key in filter(lambda x: x.startswith(name), data.keys()) }
except (FileNotFoundError):
pass
result = result if result else {}
prefix = ENV_PREFIX + name
env_result = { normalize_prefix(name,key):os.environ[key] for key in filter(lambda x: x.lower().startswith(prefix.lower()), list(os.environ.keys())) }
final_result = list({**result,**env_result}.values())
final_result = final_result if len(final_result) else None
return final_result
def read_setting(name):
if name.endswith("*"):
return read_multi_setting(name[0:-1])
else:
return read_single_setting(name)
if __name__ == "__main__":
log(f"vault-unseal.py VERSION {VERSION}")
if debug_is_enabled():
debug("Dumping environment block:")
pprint(dict(os.environ))
addressUrl = read_setting("ADDRESS_URL")
timeIntervalSeconds = int(read_setting("TIME_INTERVAL_SECONDS"))
unseal_keys = read_setting("UNSEAL_KEY_*")
if not addressUrl:
print_param_error("ADDRESS_URL")
sys.exit(1)
if not timeIntervalSeconds:
print_param_error("TIME_INTERVAL_SECONDS")
sys.exit(1)
if not unseal_keys:
print_param_error("UNSEAL_KEY_*")
sys.exit(1)
log(f"ADDRESS_URL = {addressUrl}")
log(f"TIME_INTERVAL_SECONDS = {timeIntervalSeconds}")
log("Number of unseal keys: " + str(len(unseal_keys)))
debug("UNSEAL_KEYS:")
for key in unseal_keys:
debug(f"- {key}")
log("If you do not see any output below, it means that the vault is contacted successfully and its unsealed")
log(f"Vault will be contacted every {timeIntervalSeconds} seconds")
log("Run with environment variable VU_DEBUG set to 1 for debug output")
while True:
try:
r = requests.get(f"{addressUrl}/v1/sys/seal-status").json()
debug(f"status:{r}")
if "sealed" in r:
if r["sealed"] == True:
log("Detected sealed vault. Unsealing...")
for key in unseal_keys:
debug(f"key:{key}")
r = requests.put(f"{addressUrl}/v1/sys/unseal", json = {"key":key}).json()
debug(f"unseal:{r}")
if r["sealed"] == True:
log("something went wrong, failed to unseal. Check the keys")
log(r)
sys.exit(2)
else:
log("Unsealed successfully")
else:
log("Error: cannot find 'sealed' in returned json")
pprint(r)
except Exception as e:
log(f"Exception:{e}")
log(type(e))
pprint(vars(e))
time.sleep(timeIntervalSeconds)检查if "VU_DEBUG" in os.environ出现在多个地方。也许将它移到debug_is_enabled()函数中会更容易。此外,这还可以像代码的其他部分一样重用ENV_PREFIX常量:
def debug_is_enabled():
return ENV_PREFIX + "DEBUG" in os.environ在read_multi_setting中,您有:
result = None
// code_assigning_or_not_a_new_dict_to_result
result = result if result else {}最后一行只在仍然是result的情况下更改None。这样写起来就更直接了:
result = {}
// code_assigning_or_not_a_new_dict_to_result在read_single_setting中,您解析一个文件以获得一个将被环境变量覆盖的值。也许,只有在没有发现任何情况下,才能更容易地执行环境检查并解析该文件。
我会写一些东西,比如:
def read_single_setting(name):
if ENV_PREFIX+name in os.environ:
return os.environ[ENV_PREFIX+name]
try:
with open(CONFIG_FILE) as data_file:
data = json.load(data_file)
return data[name]
except (FileNotFoundError, KeyError):
pass
return None检索3种不同的设置,并检查是否为false。我不确定这是否有意这样做,但对于timeIntervalSeconds,在检查之前执行到int的转换。我可能会在检查后执行转换(同时,休眠0秒钟似乎是有效的)。
在read_multi_setting中,final_result最初是一个列表。然后,有:
final_result = final_result if len(final_result) else None这让我觉得很困惑。现在,final_result不是一直是一个列表,不是空的,也不是空的,现在不是非空的列表,就是没有的列表。它使函数更难正确地使用,同时也使其更加复杂。这可以简单地说:
return list({**result,**env_result}.values())您可以在循环之前调用prefix.lower()。
您可以在理解过程中使用if语法,以避免调用filter。
在这一阶段,你们有:
prefix = (ENV_PREFIX + name).lower()
env_result = { normalize_prefix(name,key):os.environ[key] for key in list(os.environ.keys()) if key.lower().startswith(prefix) }据我所知,您不需要转换到list。此外,您还可以使用items()直接迭代键和值:
env_result = { normalize_prefix(name, key): value for key, value in os.environ.items() if key.lower().startswith(prefix) }的比较
您不需要编写,if value == True:,您可以简单地编写if value:。
get从dict获得默认值的值。
与其检查一个值是否在dict中,然后得到该值,您还可以一次完成这两个操作。
if "sealed" in r:
if r["sealed"]:
if r.get("sealed", None):在此阶段,代码如下所示:
import os
import sys
import json
import time
import datetime
from pprint import pprint
import requests
VERSION = "0.1"
ENV_PREFIX = "VU_"
CONFIG_FILE = "vault-unseal.json"
def debug_is_enabled():
return ENV_PREFIX + "DEBUG" in os.environ
def log(string):
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
sys.stdout.write(f"[{timestamp}] {string}\n")
def debug(string):
if debug_is_enabled():
log(f"DEBUG: {string}")
def print_param_error(name):
log(f"Error: {name} is not specified. Specify it either in {CONFIG_FILE} file, or as enviroment variable prefixed with {ENV_PREFIX}")
def normalize_prefix(name, string):
return name + string[len(ENV_PREFIX+name):]
def read_single_setting(name):
if ENV_PREFIX+name in os.environ:
return os.environ[ENV_PREFIX+name]
try:
with open(CONFIG_FILE) as data_file:
data = json.load(data_file)
return data[name]
except (FileNotFoundError, KeyError):
pass
return None
def read_multi_setting(name):
result = {}
try:
with open(CONFIG_FILE) as data_file:
data = json.load(data_file)
result = { key:data[key] for key in filter(lambda x: x.startswith(name), data.keys()) }
except (FileNotFoundError):
pass
prefix = (ENV_PREFIX + name).lower()
env_result = { normalize_prefix(name, key): value for key, value in os.environ.items() if key.lower().startswith(prefix) }
return list({**result,**env_result}.values())
def read_setting(name):
if name.endswith("*"):
return read_multi_setting(name[0:-1])
else:
return read_single_setting(name)
if __name__ == "__main__":
log(f"vault-unseal.py VERSION {VERSION}")
if debug_is_enabled():
debug("Dumping environment block:")
pprint(dict(os.environ))
addressUrl = read_setting("ADDRESS_URL")
timeIntervalSeconds = int(read_setting("TIME_INTERVAL_SECONDS"))
unseal_keys = read_setting("UNSEAL_KEY_*")
if not addressUrl:
print_param_error("ADDRESS_URL")
sys.exit(1)
if not timeIntervalSeconds:
print_param_error("TIME_INTERVAL_SECONDS")
sys.exit(1)
if not unseal_keys:
print_param_error("UNSEAL_KEY_*")
sys.exit(1)
log(f"ADDRESS_URL = {addressUrl}")
log(f"TIME_INTERVAL_SECONDS = {timeIntervalSeconds}")
log("Number of unseal keys: " + str(len(unseal_keys)))
debug("UNSEAL_KEYS:")
for key in unseal_keys:
debug(f"- {key}")
log("If you do not see any output below, it means that the vault is contacted successfully and its unsealed")
log(f"Vault will be contacted every {timeIntervalSeconds} seconds")
log("Run with environment variable VU_DEBUG set to 1 for debug output")
while True:
try:
r = requests.get(f"{addressUrl}/v1/sys/seal-status").json()
debug(f"status:{r}")
if r.get("sealed", None):
log("Detected sealed vault. Unsealing...")
for key in unseal_keys:
debug(f"key:{key}")
r = requests.put(f"{addressUrl}/v1/sys/unseal", json = {"key":key}).json()
debug(f"unseal:{r}")
if r["sealed"]:
log("something went wrong, failed to unseal. Check the keys")
log(r)
sys.exit(2)
else:
log("Unsealed successfully")
else:
log("Error: cannot find 'sealed' in returned json")
pprint(r)
except Exception as e:
log(f"Exception:{e}")
log(type(e))
pprint(vars(e))
time.sleep(timeIntervalSeconds)https://codereview.stackexchange.com/questions/184122
复制相似问题