首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python 3中生成器函数和类组合的反馈

Python 3中生成器函数和类组合的反馈
EN

Code Review用户
提问于 2020-09-06 12:04:24
回答 2查看 131关注 0票数 2

我编写了一个Python脚本,它的目的是从CloudWatch读取日志,然后将它们发布到ElasticSearch。它还没有完全完成,但我已经取得了足够大的进展,可以从Python专家的反馈中获益,特别是:

  • Python生成器的使用是否正确和惯用?
  • 类组成的用法正确吗?
  • 任何其他风格上的不适合Python 3。
代码语言:javascript
复制
#!/usr/bin/env python3

import json
import time
import uuid
import os
import sys

import boto3
from elasticsearch import Elasticsearch, helpers

client = boto3.client("logs")


def usage() -> None:
    print("Usage: GROUP_NAME=cloudwatch_group ES_HOST=es_host {}".format(
        os.path.basename(__file__)))
    sys.exit(1)

if "GROUP_NAME" not in os.environ:
    usage()
if "ES_HOST" not in os.environ:
    usage()


class CWLogs:
    group_name = os.environ["GROUP_NAME"]

    def events(self) -> None:
        for event in self.__generate_events():
            yield event

    def __generate_streams(self) -> None:
        kwargs = {
            "logGroupName": self.group_name,
        }

        while True:
            stream_batch = client.describe_log_streams(**kwargs)
            yield from stream_batch["logStreams"]
            try:
                kwargs["nextToken"] = stream_batch["nextToken"]
            except KeyError:
                break

    def __generate_events(self) -> None:
        stream_names = \
      [stream["logStreamName"] for stream in self.__generate_streams()]

        for stream_name in stream_names:
            kwargs = {
                "logGroupName":  self.group_name,
                "logStreamName": stream_name,
            }

            while True:
                logs_batch = client.get_log_events(**kwargs)
                yield from logs_batch["events"]
                try:
                    kwargs["nextToken"] = logs_batch["nextToken"]
                except KeyError:
                    break


class ESWriter:
    es_host = os.environ["ES_HOST"]
    elastic = Elasticsearch()

    def post(self, events: object) -> None:
        try:
            response = helpers.bulk(
                self.elastic, self.__transformer(events))
            print("\nRESPONSE:", response)
        except Exception as e:
            print("\nERROR:", e)

    @staticmethod
    def __index_name(timestamp: str) -> str:
        return "eventbridge-auth0-{}".format(
            time.strftime("%Y.%m", time.localtime(timestamp)))

    @staticmethod
    def __normalize(message: str) -> str:
        return message # TODO.

    def __transformer(self, events: object) -> None:
        for event in events:
            yield self.__transform(event)

    def __transform(self, event: dict) -> None:
        timestamp = event["timestamp"]
        index_name = self.__index_name(timestamp)
        message = self.__normalize(event["message"])

        return "\n".join([
            json.dumps({
                "index": {
                    "_id": str(uuid.uuid4()), # TODO. Check
                    "_index": index_name,
                    "_type":  "_doc"}}),
            json.dumps({
                "source": {
                    "@source": "auto-populate script",
                    "@timestamp": timestamp,
                    "@message": message}})])


if __name__ == '__main__':
    ESWriter().post(CWLogs().events())
EN

回答 2

Code Review用户

回答已采纳

发布于 2020-09-07 18:12:11

代码组织

您的代码组织似乎不存在。你有:

  • 导入
  • 代码
  • 功能定义
  • 代码
  • 类定义
  • 主保护码

代码应该以更一致的结构来组织,例如:

  • 导入
  • 类定义
  • 函数定义
  • 主保护码

如果将文件导入另一个文件,则使用主保护程序的目的是防止代码运行。在这里,您有两个独立的代码块,它们被无条件地执行。这限制了代码重用。例如,假设有人可以使用CWLogs完成自己的任务,但不需要ESWriter。他们尝试from your_file import CWLogs,并在显示了一条关于如何执行他们没有实际运行的程序的神秘错误消息之后,发现他们的程序退出了,因为他们没有实际使用一个环境变量。

sys.exit()

别这么叫。它终止Python解释器。

当程序完成时,您可能希望做的任何调试都是不可能的,因为整个Python环境都崩溃了。使用try: import your_file except ImportError:安全导入文件是不可能的,因为Python在导入过程中终止,这意味着程序试图无条件地导入它。如果您尝试使用unittest来测试您的程序,或者使用Sphinx为您的程序生成文档,或者使用其他许多常见的东西,您就不能使用,因为您的文件已经无条件地终止了Python解释器。

别叫了。

相反:

代码语言:javascript
复制
if __name__ == '__main__':
    if {'GROUP_NAME', 'ES_HOST'} <= os.environ.keys():
        main()
    else:
        usage()

usage()不需要调用sys.exit()。在调用usage()并正常返回之后,执行会到达文件的末尾,如果这是主程序文件,那么程序自然会结束。当然,如果这不是主程序文件,主保护程序就不会运行任何一种方法,执行就会到达文件的末尾,完成将文件作为另一个程序中的一个模块输入。

停止编写类

参见杰克·迪德里希( Jack )的"停止写作课“( PyCon )演讲。

没有实例数据成员的类可能不应该是类。ESWriterCWLogs都没有任何实例数据成员。

没有构造函数并且只有一个要调用的公共方法的类不应该是类。ESWriterCWLogs都没有构造函数。两者都有一个公共方法,在构造类实例后立即调用,因此实例甚至不会保存。

这些不应该是类。

私名损坏

当一个类从另一个类派生时,通常当基类和派生类处于不同实体的控制下时,私名毁损用于防止私有成员名称冲突。例如,如果从tkinter.Frame派生出自己的类,并在类中创建_validate方法,则如果基类有自己的_validate方法突然更改,则可以使它停止正常工作。因此,基类将使用__validate,前面的双下划线将触发名称"mangling",并将名称替换为_Frame__validate,因此冲突的可能性较小。

在您的方法名称中使用双下划线前缀似乎没有任何理由;一个下划线将更加惯用。

类型提示

你的提示是错误的。

例如,以下内容显然是返回一个str,而不是None

代码语言:javascript
复制
    def __transform(self, event: dict) -> None:
        ...
        return "\n".join( ... )

由于__transformer正在产生__transform的结果,所以它也不会返回None,而应该声明为:

代码语言:javascript
复制
from typing import Generator

...

    def __transformer(self, events: object) -> Generator[str, None, None]:
        ...

或者简单地说:

代码语言:javascript
复制
from typing import Iterator

...

    def __transformer(self, events: object) -> Iterator[str]:
        ...

events: object实际上是毫无意义的,因为Python中的所有东西都是对象。要么为它使用适当的类型,要么根本不去理会类型提示。

生成器表达式

正如叶波特尔齐科所指出的,

代码语言:javascript
复制
def __generate_events(self) -> None:
    stream_names = [stream["logStreamName"] for stream in self.__generate_streams()]

    for stream_name in stream_names:
        ...

构建一个临时列表,然后立即遍历它。为了避免临时列表,他们对代码做了相当大的修改。有一个小得多的变化可以做:

代码语言:javascript
复制
def __generate_events(self) -> None:
    stream_names = (stream["logStreamName"] for stream in self.__generate_streams())

    for stream_name in stream_names:
        ...

因为可能很难看到这种变化,我将放大它:[...]被更改为(...)。这意味着stream_names不是作为内存中的列表来实现的,而是变成一个生成器表达式,当询问时,它将生成一个值。

这里没有什么不同,但是如果stream_names被传递给一个函数,而不是在本地使用,yedpodtrziko提出的更改将需要更远的代码来接受stream_obj并提取函数中的流名。

票数 3
EN

Code Review用户

发布于 2020-09-07 04:27:25

不要硬编码类中的环境变量。而不是这样:

代码语言:javascript
复制
class CWLogs:
    group_name = os.environ["GROUP_NAME"]

这样做吧:

代码语言:javascript
复制
class CWLogs:
    group_name = None

    def __init__(self, group_name):
         self.group_name = group_name

if not GROUP_NAME := getenv('GROUP_NAME'):
    usage()
# pass the variable when initializing the class :
CWLogs(GROUP_NAME)

这将使代码更易于维护,因为它没有将代码紧密地绑定到env。变量,而不是您将通过的任何代码,这样编写此类代码的测试就更容易了。此外,您不必在两个位置重复该变量,这将增加您在一个地方输入错误或当功能发生变化时忘记在两个位置修改它的可能性。ESWriter类也是如此。

还有一个函数:

代码语言:javascript
复制
def __generate_events(self) -> None:
    stream_names = [stream["logStreamName"] for stream in self.__generate_streams()]

    for stream_name in stream_names:
        ...

这里您有一个不必要的额外循环和在内存中分配的额外列表。首先,您遍历从__generate_streams()返回的数据,然后再次迭代相同的数据。你可以这样做:

代码语言:javascript
复制
def __generate_events(self) -> None:
    for stream_obj in self.__generate_streams():
        stream_name = stream_obj['logStreamName']
票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/248999

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档