首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >堆栈交换图形数据-辅助模块

堆栈交换图形数据-辅助模块
EN

Code Review用户
提问于 2019-08-28 19:10:26
回答 1查看 79关注 0票数 6

建立在以前评论的基础上

虽然我不是迭代评审,但我尝试遵循对我以前的一些问题提出的建议:

  1. 吻,在我之前的一些问题中,我已经把设计复杂化了。尽管我做了很多事情,但我还是尽力让一切尽可能的简单。我认为我在我的大部分代码中实现了这一点,其中只有我的协同机制可能有点麻烦。即使这样,我仍然认为设计仍然相当简单,因为魔术隐藏在一个完整的文档包装中。
  2. 缺乏文件/评论。在这个程序中,我觉得只有一个函数需要注释,因为它执行了一些我在实现时并不真正理解的魔术。我还在所有内容中添加了docstring,这消除了对大多数注释的需求。我觉得我的一些文档字符串是好的,并实现了他们的目标,说出程序的功能。然而,我觉得大多数都不那么吸引人,因为它们只是一个简单的注释,与他们正在“记录”的函数/类/方法的名称相呼应。
  3. 第八条违规行为。我已经在代码中运行了相当多的静态分析工具。据我所知,这些工具提出的大多数问题不再是PEP 8问题。大多数情况下,mypy和pylint无法导入我使用过的库。

因此,如果我的代码显示了这些问题中的任何一个,那么很高兴知道如何修复它。

别忘了一个答案只需要做一个有洞察力的观察

我的程序做什么

我的完整的代码可以在Git集线器上看到.和文档可以使用tox -e docs构建。你也可以选择在线查看文档。虽然此页更深入地解释了整个代码,但下面是一个简短的描述。

运行segd meta.codereview是这样的:

  1. 下载想要的站点的堆栈交换数据转储,在.cache/中缓存归档文件。
  2. 解压7z存档,在.cache/meta.codereview中缓存文件。
  3. 阅读XML数据以获取注释和帖子。
  4. 从帖子和评论中提取所有链接。使用docutils将注释转换为html,并使用漂亮的汤提取超链接。
  5. 将所有链接保存在meta.codereview.edges.csv中,并在meta.codereview.nodes.csv中发布信息。

我的代码

为了避免我的问题成为腐殖质,我只在帮手包中包含了代码。这些代码独立于其余代码运行,但可以相互依赖。

SI

将数字(如100000 )转换为短格式(如' 97.66KiB' )。Magnitude类中的函数用于将值转换为基大小最多的值。对于Magnitude.ibyte来说,这意味着最多只能达到1023。然后使用display将数字显示到一定数量的小数位。这使得在显示大量数据时很容易使用库。

代码语言:javascript
复制
"""Simplify a number to a wanted base."""

import math
from typing import Callable, Tuple


def si_magnitude(
    base: int,
    suffix: str,
    prefixes: str,
) -> Callable[[int], Tuple[int, str]]:
    """
    SI base converter builder.

    :param base: Base to truncate values to.
    :param suffix: Suffix used to denote the type of information.
    :param prefixes: Prefixes before the suffix to denote magnitude.
    :return: A function to change a value by the above parameters.
    """
    prefixes = ' '.join(prefixes.split('|')[::-1])
    prefixes_ = prefixes.split(' ')

    def inner(value: int) -> Tuple[int, str]:
        """
        Convert a number to a truncated base form.

        :param value: Value to adjust.
        :return: Truncated value and unit.
        """
        logged = math.log(value, base)
        if -1 < value < 1:
            logged -= 1
        remainder = value / base ** int(logged)
        return remainder, prefixes_[int(logged)] + suffix
    return inner


_MAGNITUDE = 'f p n μ m| k M G T P E Z Y'


class Magnitude:
    """Magnitude conversions."""

    ibyte = si_magnitude(
        1024,
        'B',
        '| Ki Mi Gi Ti Pi Ei Zi Yi',
    )
    byte = si_magnitude(
        1000,
        'B',
        _MAGNITUDE,
    )
    number = si_magnitude(
        1000,
        '',
        _MAGNITUDE,
    )


def display(values: Tuple[int, str], decimal_places: int = 2) -> str:
    """
    Display a truncated number to a wanted DP.

    :param values: Value and unit to display.
    :param decimal_places: Amount of decimal places to display the value to.
    :return: Right aligned display value.
    """
    value, unit = values
    decimal_places = int(decimal_places)
    width = 4 + decimal_places
    if decimal_places > 0:
        return f'{value:>{width}.{decimal_places}f}{unit}'
    value = int(value)
    return f'{value:>3}{unit}'

Progress

它封装了一个迭代器,并显示了您在迭代器中的距离,并提供了一些额外的信息,比如总大小和速度。它使用SI模块显示它的数字,因此它们以人类可读的形式出现。

代码语言:javascript
复制
"""Display progress of a stream."""

import time
import warnings
from typing import Callable, Generic, Iterator, Optional, Tuple, TypeVar

from .si import Magnitude, display

# nosa(1): pylint[:Class name "T" doesn't conform to PascalCase naming style]
T = TypeVar('T')


# nosa(1): pylint[:Too many instance attributes]
class BaseProgressStream(Generic[T]):
    """Display the progress of a stream."""

    # nosa(1): pylint[:Too many arguments]
    def __init__(
        self,
        stream: Iterator[T],
        size: Optional[int],
        si: Callable[[int], Tuple[int, str]],
        progress: Callable[[T], int],
        width: int = 20,
        prefix: str = '',
        start: int = 0,
        message: Optional[str] = None,
    ):
        """Initialize BaseProgressStream."""
        self.stream = stream
        self.size = size
        self.width = width
        self.progress_bar = '=' * (width - 1) + '>'
        self.prefix = prefix
        self.to_readable = si
        self.progress_fn = progress
        self._start = start
        self.message = message

    def _get_progress(self, current: int) -> str:
        """
        Get the progress of the stream.

        :param current: Current progress - not in percentage.
        :return: Progress bar and file size.
        """
        if not self.size:
            return ''
        amount = self.width * current // self.size
        progress = self.progress_bar[-amount:] if amount else ''
        disp_size = display(self.to_readable(self.size))
        return f'[{progress:<{self.width}}] {disp_size} '

    def __iter__(self) -> Iterator[T]:
        """
        Echo the stream, and update progress.

        Catches all warnings raised whilst processing the stream to be
        displayed afterwards. This keeps the UI tidy and prevents the
        progress bar traveling over multiple lines.

        :return: An echo of the input stream.
        """
        with warnings.catch_warnings(record=True) as warnings_:
            current = self._start
            if self.message:
                print(self.message)
            start = time.clock()
            for chunk in self.stream:
                current += self.progress_fn(chunk)
                progress = self._get_progress(current)
                rate = current // max(int(time.clock() - start), 1)
                disp_rate = display(self.to_readable(rate))
                print(
                    f'\r{self.prefix}{progress}{disp_rate}/s',
                    end='',
                    flush=True,
                )
                yield chunk
            print()
        for warning in warnings_:
            warnings.showwarning(
                warning.message,
                warning.category,
                warning.filename,
                warning.lineno,
            )


class DataProgressStream(BaseProgressStream[T]):
    """Display progress of a data stream."""

    def __init__(
        self,
        stream: Iterator[T],
        size: Optional[int],
        width: int = 20,
        prefix: str = '',
        message: Optional[str] = None,
    ):
        """Initialize DataProgressStream."""
        super().__init__(
            stream,
            size,
            Magnitude.ibyte,
            len,
            width,
            prefix,
            0,
            message,
        )


class ItemProgressStream(BaseProgressStream[T]):
    """Display progress of an item stream."""

    def __init__(
        self,
        stream: Iterator[T],
        size: Optional[int],
        width: int = 20,
        prefix: str = '',
        message: Optional[str] = None,
    ):
        """Initialize ItemProgressStream."""
        super().__init__(
            stream,
            size,
            Magnitude.number,
            lambda _: 1,
            width,
            prefix,
            1,
            message,
        )

Curl

它是一个模仿curl的小型库,它通过使用进度模块显示有关下载的信息。

代码语言:javascript
复制
"""Copy URL."""

import os
import pathlib
from typing import Any

# nosa(1): pylint
import requests

from . import progress


def curl(
    path: pathlib.Path,
    *args: Any,
    **kwargs: Any,
) -> None:
    """
    Download file to system.

    Provides a progress bar of the file being downloaded and some
    statistics around the file and download.

    :param path: Local path to save the file to.
    :param args&kwargs: Passed to :code:`request.get`.
    """
    response = requests.get(*args, stream=True, **kwargs)
    response.raise_for_status()
    length_ = response.headers.get('content-length')
    length = int(length_) if length_ else None
    path.parent.mkdir(parents=True, exist_ok=True)

    print(f'Downloading: {response.url}')
    try:
        with path.open('wb') as output:
            for chunk in progress.DataProgressStream(
                response.iter_content(chunk_size=512),
                length,
                prefix='  ',
            ):
                output.write(chunk)
    except BaseException:
        os.remove(path)
        raise

缓存

公开一个非常简单的缓存。返回的对象都公开了一个ensure方法,以允许动作在以后发生,这意味着缓存可以依赖于另一个缓存。因此,如果不存在所需的7z存档,则存档可以依赖于文件下载程序。

代码语言:javascript
复制
"""
Simple file cache.

Exposes two forms of cache:

1. A file that is downloaded from a website.
2. A 7z archive cache - files that are extracted from a 7z archive.
"""

import pathlib

# nosa(1): pylint,mypy
import py7zlib

from . import curl, si


class CacheMethod:
    """Base cache object."""

    def __init__(self, cache_path: pathlib.Path) -> None:
        """Initialize CacheMethod."""
        self.cache_path = cache_path

    def _is_cached(self, use_cache: bool) -> bool:
        """
        Check if the target exist in the cache.

        :param use_cache: Set to false to force redownload the data.
        :return: True if we should use the cache.
        """
        return use_cache and self.cache_path.exists()

    def ensure(self, use_cache: bool = True) -> pathlib.Path:
        """
        Ensure target file exists.

        This should be overwritten in child classes.

        :param use_cache: Set to false to force redownload the data.
        :return: Location of file.
        """
        raise NotImplementedError('Should be overwritten in subclass.')


class FileCache(CacheMethod):
    """Exposes a cache that allows downloading files."""

    def __init__(self, cache_path: pathlib.Path, url: str) -> None:
        """Initialize FileCache."""
        super().__init__(cache_path)
        self.url = url

    def ensure(self, use_cache: bool = True) -> pathlib.Path:
        """
        Ensure target file exists.

        This curls the file from the web to cache, providing a progress
        bar whilst downloading.

        :param use_cache: Set to false to force redownload the data.
        :return: Location of file.
        """
        if not self._is_cached(use_cache):
            curl.curl(self.cache_path, self.url)
        return self.cache_path


class Archive7zCache(CacheMethod):
    """Exposes a cache that allows unzipping 7z archives."""

    def __init__(
        self,
        cache_path: pathlib.Path,
        archive_cache: CacheMethod,
    ) -> None:
        """Initialize Archive7zCache."""
        super().__init__(cache_path)
        self.archive_cache = archive_cache

    def ensure(self, use_cache: bool = True) -> pathlib.Path:
        """
        Ensure target file exists.

        Unzips the 7z archive showing the name and size of each file
        being extracted.

        :param use_cache: Set to false to force reunarchiving of the data.
        :return: Location of file.
        """
        if not self._is_cached(use_cache):
            with self.archive_cache.ensure(use_cache).open('rb') as input_file:
                print(f'Unziping: {input_file.name}')
                archive = py7zlib.Archive7z(input_file)
                directory = self.cache_path.parent
                directory.mkdir(parents=True, exist_ok=True)
                for name in archive.getnames():
                    output = directory / name
                    member = archive.getmember(name)
                    size = si.display(si.Magnitude.ibyte(member.size))
                    print(f'  Unpacking[{size}] {name}')
                    with output.open('wb') as output_file:
                        output_file.write(archive.getmember(name).read())
        return self.cache_path


class Cache:
    """Interface to make cache instances."""

    def __init__(self, cache_dir: pathlib.Path) -> None:
        """Initialize Cache."""
        self.cache_dir = cache_dir

    def file(self, cache_path: str, url: str) -> FileCache:
        """
        Get a file cache endpoint.

        :param cache_path: Location of file relative to the cache directory.
        :param url: URL location of the file to download from if not cached.
        :return: A file cache endpoint.
        """
        return FileCache(self.cache_dir / cache_path, url)

    def archive_7z(
        self,
        cache_path: pathlib.Path,
        archive_cache: CacheMethod,
    ) -> Archive7zCache:
        """
        Get an archive cache endpoint.

        :param cache_path: Location of file relative to the cache directory.
        :param archive_cache: A cache endpoint to get the 7z archive from.
        :return: An archive cache endpoint.
        """
        return Archive7zCache(self.cache_dir / cache_path, archive_cache)

XRef

这将Sphinx partial_xref对象扩展为完整的urls。这是因为您可以使用[link description](/q/1)链接到帖子。第二,Simon曾经没有将一个示例链接包装在后面,否则会导致代码爆炸,所以我们也会处理这个问题。[link description](target)就是一个例子。

代码语言:javascript
复制
"""Expand partial xrefs."""
# nosa: pylint,mypy

from typing import List, Type

import docutils.core
import docutils.nodes
import docutils.parsers
import docutils.transforms

from recommonmark.parser import CommonMarkParser

import sphinx.addnodes

__all__ = [
    'custom_parser',
]


def custom_parser(prefix: str) -> Type[docutils.parsers.Parser]:
    """
    Markdown parser with partial xref support.

    Extends :code:`recommonmark.parser.CommonMarkParser` with to include
    the :code:`custom_parser.PendingXRefTransform` transform.

    :param prefix: Http base to prepend to partial hyperlinks.
    :return: A custom parser to parse Markdown.
    """
    class PendingXRefTransform(docutils.transforms.Transform):
        """
        Expands partial links.

        Some links are provided like :code:`[text](/a/2)``.
        This expands the link to include the basename like:

        .. :
           http://codereview.meta.stackexchange.com
        """

        default_priority = 999

        @staticmethod
        def handle_xref(
                node: sphinx.addnodes.pending_xref,
        ) -> docutils.nodes.Node:
            """Convert partial_xref to desired output."""
            referance, = node.children
            ref = node.attributes['reftarget']
            if ref != referance.attributes['refuri']:
                print(
                    'target not the same',
                    node.attributes['reftarget'],
                    referance.attributes['refuri'],
                )

            if ref.startswith('/'):
                referance['refuri'] = prefix + ref
                return referance
            # Handles 'links' like [this other thing](link)
            text, = referance.children
            if not isinstance(text, docutils.nodes.Text):
                print('Referance text is not text.')
            return docutils.nodes.Text(f'[{text.rawsource}]({ref})')

        def traverse(self, node: docutils.nodes.Node) -> None:
            """Traverse the tree updating partial_xref nodes."""
            transforms = []
            children = []
            for child in getattr(node, 'children', [])[:]:
                if isinstance(child, sphinx.addnodes.pending_xref):
                    new_child = self.handle_xref(child)
                    transforms.append((child, new_child))
                    child = new_child
                children.append(child)

            replace = getattr(node, 'replace', None)
            if replace is not None:
                for old, new in transforms:
                    replace(old, new)

            for child in children:
                self.traverse(child)

        def apply(self) -> None:
            """Docutils entry."""
            self.traverse(self.document)

    class CustomParser(CommonMarkParser):
        """Subclass of CommonMark to add XRef transform."""

        def get_transforms(self) -> List[Type[docutils.transforms.Transform]]:
            """Get transformations used for this passer."""
            return [PendingXRefTransform]

    return CustomParser

协联

主函数coroutine增加了很多魔力,这在代码中有更好的描述。在大多数情况下,除非控制流进入一个糟糕的状态,否则它不会做太多事情。

代码语言:javascript
复制
"""
Coroutine helpers.

A lot of this module is based on the assumption that Python doesn't
seamlessly handle the destruction of coroutines when using multiplexing
or broadcasting. It also helps ease interactions when coroutines enter
closed states prematurely.
"""

import functools
import itertools
import types
from typing import (
    Any, Callable, Generator, Iterable, Iterator, List, Optional, Tuple, Union,
)

NEW_SOURCE = object()
EXIT = object()

IIter = Union[Iterator, Iterable]


class CoroutineDelegator:
    """Helper class for delegating to coroutines."""

    _queue: List[Tuple[IIter, Generator]]

    def __init__(self) -> None:
        """Initialize CoroutineDelegator."""
        self._queue = []

    def send_to(
        self,
        source: IIter,
        target: Generator,
    ) -> None:
        """
        Add a source and target to send data to.

        This does not send any data into the target, to do that use the
        :meth:`CoroutineDelegator.run` function.

        :param source: Input data, can be any iterable. Each is passed
                       straight unaltered to target.
        :param target: This is the coroutine the data enters into to get
                       into the coroutine control flow.
        """
        self._queue.append((source, target))

    def _increment_coroutine_refs(self) -> None:
        """Increment the amount of sources for the coroutines."""
        for _, target in self._queue:
            if _is_magic_coroutine(target):
                target.send(NEW_SOURCE)

    def _run(self, source: IIter, target: Generator) -> Optional[Iterator]:
        item = sentinel = object()
        source_ = iter(source)
        try:
            for item in source_:
                target.send(item)
        except StopIteration:
            if item is sentinel:
                return source_
            return itertools.chain([item], source_)
        else:
            if _is_magic_coroutine(target):
                target.send(EXIT)
        return None

    def run(self) -> List[Iterator]:
        """
        Send all data into the coroutine control flow.

        :return: If a coroutine is closed prematurely the data that
                 hasn't been entered into the control flow will be
                 returned. Otherwise an empty list is.
        """
        self._increment_coroutine_refs()

        output: List[Optional[Iterator]] = [
            None for _ in range(len(self._queue))
        ]
        for i, (source, target) in enumerate(self._queue):
            output[i] = self._run(source, target)
        self._queue = []
        if any(output):
            return [iter(o or []) for o in output]
        return []


def primed_coroutine(function: Callable[..., Generator]) -> Callable:
    """
    Primes a coroutine at creation.

    :param function: A coroutine function.
    :return: The coroutine function wrapped to prime the coroutine at creation.
    """
    function = types.coroutine(function)

    def inner(*args: Any, **kwargs: Any) -> Generator:
        output = function(*args, **kwargs)
        next(output)
        return output

    return inner


def _is_magic_coroutine(target: Any) -> bool:
    """
    Check if target is a magic coroutine.

    :param target: An object to check against.
    :return: If the object is a magic coroutine.
    """
    try:
        return bool(
            target
            and target.__qualname__.endswith('coroutine.<locals>.magic'),
        )
    except Exception:
        return False


def coroutine(function: Callable) -> Callable:
    """
    Wrap a coroutine generating function to make magic coroutines.

    A magic coroutine is wrapped in a protective coroutine that eases
    the destruction of coroutine pipelines. This is because the
    coroutine is wrapped in a 'bubble' that:

    1. Primes the coroutine when the first element of data is passed to it.
    2. Sends information about the creation and destruction of other
       coroutines in the pipeline. This allows a coroutine to destroy
       itself when all providers have exited.
    3. Handles when a coroutine is being prematurely closed, if this is
       the case all target coroutines will be notified that some data
       sources are no longer available allowing them to deallocate
       themselves if needed.
    4. Handles situations where a target coroutine has been prematurely
       closed. In such a situation the current coroutine will be closed
       and exit with a StopIteration error, as if the coroutine has been
       closed with the :code:`.close`.

    It should be noted that these coroutine pipelines should be started via the
    :class:`stack_exchange_graph_data.helpers.coroutines.CoroutineDelegator`.
    This is as it correctly initializes the entry coroutine, and handles
    when the coroutine has been prematurely closed.

    :param function: Standard coroutine generator function.
    :return: Function that generates magic coroutines.
    """
    self: Generator

    @primed_coroutine
    def magic(*args: Any, **kwargs: Any) -> Generator:
        # Get magic coroutine targets
        targets_ = itertools.chain(args, kwargs.values())
        targets = [
            t
            for t in targets_
            if _is_magic_coroutine(t)
        ]

        # Create wrapped coroutine
        wrapped = function(*args, **kwargs)

        # Broadcast the creation of a new source to the targets
        for target in targets:
            target.send(NEW_SOURCE)

        sources = 0
        generator_exit_flag = False
        generator_iteration_flag = False
        active = False
        try:
            # Main coroutine loop handles adding and removing source counters.
            while True:
                item = yield
                if item is NEW_SOURCE:
                    sources += 1
                elif item is EXIT:
                    sources -= 1
                    if not sources:
                        break
                else:
                    # Allows coroutines to be uninitialized until
                    # they're needed to be active.
                    if not active:
                        next(wrapped)
                        active = True
                    wrapped.send(item)
        # Raised when a anything above parent has been killed
        except RuntimeError:
            pass
        # Raised when a parent has been killed
        except StopIteration:
            generator_iteration_flag = True
        # Raised when this is being killed via `.close`.
        except GeneratorExit:
            generator_exit_flag = True
        finally:
            # Close the wrapped coroutine
            # This happens first, so any code in a `finally` can
            # propagate correctly
            try:
                wrapped.close()
            except RuntimeError:
                pass

            # Decrement target coroutine's source counters
            if targets and not generator_iteration_flag:
                for target in targets:
                    try:
                        for _ in range(sources):
                            target.send(EXIT)
                    except StopIteration:
                        pass

            # Coroutine must yield when it's being killed. IDK why but it does.
            # But it's illegal to yield when a GeneratorExit has been raised.
            if not generator_exit_flag:
                yield

    @functools.wraps(function)
    def inner(*args: Any, **kwargs: Any) -> Generator:
        nonlocal self
        self = magic(*args, **kwargs)
        return self

    return inner


@coroutine
def broadcast(*targets: Generator) -> Generator:
    """Broadcast items to targets."""
    while True:
        item = yield
        for target in targets:
            target.send(item)


@coroutine
def file_sink(*args: Any, **kwargs: Any) -> Generator:
    """Send all data to a file."""
    with open(*args, **kwargs) as file_obj:
        while True:
            file_obj.write((yield))
EN

回答 1

Code Review用户

发布于 2019-08-29 00:19:52

你在存储这样的震级:

代码语言:javascript
复制
_MAGNITUDE = 'f p n μ m| k M G T P E Z Y'

这是一种需要解析的序列化格式。这是不方便的,并使您的代码复杂化。只需存储元组或字典,就可以存储前缀字符串及其大小。有几种选择是:

代码语言:javascript
复制
(
  ('f', -15),
  ('p', -12),
  ('n', -9),
  # ...

(
   'mμnpf',  # Negative prefixes
   'kMGTPEZY'  # Positive prefixes
)

否则..。哇,这可是很多代码啊。它写得不太好,但对于这是一种数据处理工具,我认为它正在遭受一些功能膨胀的折磨。coroutine实现很有趣,但是您并不需要这堆代码--看到任何被描述为神奇的东西都会让我感到非常沮丧,特别是在Python中。例如,对于泛型广播,您可以只存储可迭代的函数引用。

票数 4
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/227035

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档