我编写了一个Python脚本,它的目的是从CloudWatch读取日志,然后将它们发布到ElasticSearch。它还没有完全完成,但我已经取得了足够大的进展,可以从Python专家的反馈中获益,特别是:
#!/usr/bin/env python3
import json
import time
import uuid
import os
import sys
import boto3
from elasticsearch import Elasticsearch, helpers
client = boto3.client("logs")
def usage() -> None:
print("Usage: GROUP_NAME=cloudwatch_group ES_HOST=es_host {}".format(
os.path.basename(__file__)))
sys.exit(1)
if "GROUP_NAME" not in os.environ:
usage()
if "ES_HOST" not in os.environ:
usage()
class CWLogs:
group_name = os.environ["GROUP_NAME"]
def events(self) -> None:
for event in self.__generate_events():
yield event
def __generate_streams(self) -> None:
kwargs = {
"logGroupName": self.group_name,
}
while True:
stream_batch = client.describe_log_streams(**kwargs)
yield from stream_batch["logStreams"]
try:
kwargs["nextToken"] = stream_batch["nextToken"]
except KeyError:
break
def __generate_events(self) -> None:
stream_names = \
[stream["logStreamName"] for stream in self.__generate_streams()]
for stream_name in stream_names:
kwargs = {
"logGroupName": self.group_name,
"logStreamName": stream_name,
}
while True:
logs_batch = client.get_log_events(**kwargs)
yield from logs_batch["events"]
try:
kwargs["nextToken"] = logs_batch["nextToken"]
except KeyError:
break
class ESWriter:
es_host = os.environ["ES_HOST"]
elastic = Elasticsearch()
def post(self, events: object) -> None:
try:
response = helpers.bulk(
self.elastic, self.__transformer(events))
print("\nRESPONSE:", response)
except Exception as e:
print("\nERROR:", e)
@staticmethod
def __index_name(timestamp: str) -> str:
return "eventbridge-auth0-{}".format(
time.strftime("%Y.%m", time.localtime(timestamp)))
@staticmethod
def __normalize(message: str) -> str:
return message # TODO.
def __transformer(self, events: object) -> None:
for event in events:
yield self.__transform(event)
def __transform(self, event: dict) -> None:
timestamp = event["timestamp"]
index_name = self.__index_name(timestamp)
message = self.__normalize(event["message"])
return "\n".join([
json.dumps({
"index": {
"_id": str(uuid.uuid4()), # TODO. Check
"_index": index_name,
"_type": "_doc"}}),
json.dumps({
"source": {
"@source": "auto-populate script",
"@timestamp": timestamp,
"@message": message}})])
if __name__ == '__main__':
ESWriter().post(CWLogs().events())发布于 2020-09-07 18:12:11
您的代码组织似乎不存在。你有:
代码应该以更一致的结构来组织,例如:
如果将文件导入另一个文件,则使用主保护程序的目的是防止代码运行。在这里,您有两个独立的代码块,它们被无条件地执行。这限制了代码重用。例如,假设有人可以使用CWLogs完成自己的任务,但不需要ESWriter。他们尝试from your_file import CWLogs,并在显示了一条关于如何执行他们没有实际运行的程序的神秘错误消息之后,发现他们的程序退出了,因为他们没有实际使用一个环境变量。
别这么叫。它终止Python解释器。
当程序完成时,您可能希望做的任何调试都是不可能的,因为整个Python环境都崩溃了。使用try: import your_file except ImportError:安全导入文件是不可能的,因为Python在导入过程中终止,这意味着程序试图无条件地导入它。如果您尝试使用unittest来测试您的程序,或者使用Sphinx为您的程序生成文档,或者使用其他许多常见的东西,您就不能使用,因为您的文件已经无条件地终止了Python解释器。
别叫了。
相反:
if __name__ == '__main__':
if {'GROUP_NAME', 'ES_HOST'} <= os.environ.keys():
main()
else:
usage()usage()不需要调用sys.exit()。在调用usage()并正常返回之后,执行会到达文件的末尾,如果这是主程序文件,那么程序自然会结束。当然,如果这不是主程序文件,主保护程序就不会运行任何一种方法,执行就会到达文件的末尾,完成将文件作为另一个程序中的一个模块输入。
参见杰克·迪德里希( Jack )的"停止写作课“( PyCon )演讲。
没有实例数据成员的类可能不应该是类。ESWriter和CWLogs都没有任何实例数据成员。
没有构造函数并且只有一个要调用的公共方法的类不应该是类。ESWriter和CWLogs都没有构造函数。两者都有一个公共方法,在构造类实例后立即调用,因此实例甚至不会保存。
这些不应该是类。
当一个类从另一个类派生时,通常当基类和派生类处于不同实体的控制下时,私名毁损用于防止私有成员名称冲突。例如,如果从tkinter.Frame派生出自己的类,并在类中创建_validate方法,则如果基类有自己的_validate方法突然更改,则可以使它停止正常工作。因此,基类将使用__validate,前面的双下划线将触发名称"mangling",并将名称替换为_Frame__validate,因此冲突的可能性较小。
在您的方法名称中使用双下划线前缀似乎没有任何理由;一个下划线将更加惯用。
你的提示是错误的。
例如,以下内容显然是返回一个str,而不是None:
def __transform(self, event: dict) -> None:
...
return "\n".join( ... )由于__transformer正在产生__transform的结果,所以它也不会返回None,而应该声明为:
from typing import Generator
...
def __transformer(self, events: object) -> Generator[str, None, None]:
...或者简单地说:
from typing import Iterator
...
def __transformer(self, events: object) -> Iterator[str]:
...events: object实际上是毫无意义的,因为Python中的所有东西都是对象。要么为它使用适当的类型,要么根本不去理会类型提示。
正如叶波特尔齐科所指出的,
def __generate_events(self) -> None:
stream_names = [stream["logStreamName"] for stream in self.__generate_streams()]
for stream_name in stream_names:
...构建一个临时列表,然后立即遍历它。为了避免临时列表,他们对代码做了相当大的修改。有一个小得多的变化可以做:
def __generate_events(self) -> None:
stream_names = (stream["logStreamName"] for stream in self.__generate_streams())
for stream_name in stream_names:
...因为可能很难看到这种变化,我将放大它:[...]被更改为(...)。这意味着stream_names不是作为内存中的列表来实现的,而是变成一个生成器表达式,当询问时,它将生成一个值。
这里没有什么不同,但是如果stream_names被传递给一个函数,而不是在本地使用,yedpodtrziko提出的更改将需要更远的代码来接受stream_obj并提取函数中的流名。
发布于 2020-09-07 04:27:25
不要硬编码类中的环境变量。而不是这样:
class CWLogs:
group_name = os.environ["GROUP_NAME"]这样做吧:
class CWLogs:
group_name = None
def __init__(self, group_name):
self.group_name = group_name
if not GROUP_NAME := getenv('GROUP_NAME'):
usage()
# pass the variable when initializing the class :
CWLogs(GROUP_NAME)这将使代码更易于维护,因为它没有将代码紧密地绑定到env。变量,而不是您将通过的任何代码,这样编写此类代码的测试就更容易了。此外,您不必在两个位置重复该变量,这将增加您在一个地方输入错误或当功能发生变化时忘记在两个位置修改它的可能性。ESWriter类也是如此。
还有一个函数:
def __generate_events(self) -> None:
stream_names = [stream["logStreamName"] for stream in self.__generate_streams()]
for stream_name in stream_names:
...这里您有一个不必要的额外循环和在内存中分配的额外列表。首先,您遍历从__generate_streams()返回的数据,然后再次迭代相同的数据。你可以这样做:
def __generate_events(self) -> None:
for stream_obj in self.__generate_streams():
stream_name = stream_obj['logStreamName']https://codereview.stackexchange.com/questions/248999
复制相似问题