虽然我不是迭代评审,但我尝试遵循对我以前的一些问题提出的建议:
因此,如果我的代码显示了这些问题中的任何一个,那么很高兴知道如何修复它。
我的完整的代码可以在Git集线器上看到.和文档可以使用tox -e docs构建。你也可以选择在线查看文档。虽然此页更深入地解释了整个代码,但下面是一个简短的描述。
运行segd meta.codereview是这样的:
.cache/中缓存归档文件。.cache/meta.codereview中缓存文件。meta.codereview.edges.csv中,并在meta.codereview.nodes.csv中发布信息。为了避免我的问题成为腐殖质,我只在帮手包中包含了代码。这些代码独立于其余代码运行,但可以相互依赖。
将数字(如100000 )转换为短格式(如' 97.66KiB' )。Magnitude类中的函数用于将值转换为基大小最多的值。对于Magnitude.ibyte来说,这意味着最多只能达到1023。然后使用display将数字显示到一定数量的小数位。这使得在显示大量数据时很容易使用库。
"""Simplify a number to a wanted base."""
import math
from typing import Callable, Tuple
def si_magnitude(
base: int,
suffix: str,
prefixes: str,
) -> Callable[[int], Tuple[int, str]]:
"""
SI base converter builder.
:param base: Base to truncate values to.
:param suffix: Suffix used to denote the type of information.
:param prefixes: Prefixes before the suffix to denote magnitude.
:return: A function to change a value by the above parameters.
"""
prefixes = ' '.join(prefixes.split('|')[::-1])
prefixes_ = prefixes.split(' ')
def inner(value: int) -> Tuple[int, str]:
"""
Convert a number to a truncated base form.
:param value: Value to adjust.
:return: Truncated value and unit.
"""
logged = math.log(value, base)
if -1 < value < 1:
logged -= 1
remainder = value / base ** int(logged)
return remainder, prefixes_[int(logged)] + suffix
return inner
_MAGNITUDE = 'f p n μ m| k M G T P E Z Y'
class Magnitude:
"""Magnitude conversions."""
ibyte = si_magnitude(
1024,
'B',
'| Ki Mi Gi Ti Pi Ei Zi Yi',
)
byte = si_magnitude(
1000,
'B',
_MAGNITUDE,
)
number = si_magnitude(
1000,
'',
_MAGNITUDE,
)
def display(values: Tuple[int, str], decimal_places: int = 2) -> str:
"""
Display a truncated number to a wanted DP.
:param values: Value and unit to display.
:param decimal_places: Amount of decimal places to display the value to.
:return: Right aligned display value.
"""
value, unit = values
decimal_places = int(decimal_places)
width = 4 + decimal_places
if decimal_places > 0:
return f'{value:>{width}.{decimal_places}f}{unit}'
value = int(value)
return f'{value:>3}{unit}'它封装了一个迭代器,并显示了您在迭代器中的距离,并提供了一些额外的信息,比如总大小和速度。它使用SI模块显示它的数字,因此它们以人类可读的形式出现。
"""Display progress of a stream."""
import time
import warnings
from typing import Callable, Generic, Iterator, Optional, Tuple, TypeVar
from .si import Magnitude, display
# nosa(1): pylint[:Class name "T" doesn't conform to PascalCase naming style]
T = TypeVar('T')
# nosa(1): pylint[:Too many instance attributes]
class BaseProgressStream(Generic[T]):
"""Display the progress of a stream."""
# nosa(1): pylint[:Too many arguments]
def __init__(
self,
stream: Iterator[T],
size: Optional[int],
si: Callable[[int], Tuple[int, str]],
progress: Callable[[T], int],
width: int = 20,
prefix: str = '',
start: int = 0,
message: Optional[str] = None,
):
"""Initialize BaseProgressStream."""
self.stream = stream
self.size = size
self.width = width
self.progress_bar = '=' * (width - 1) + '>'
self.prefix = prefix
self.to_readable = si
self.progress_fn = progress
self._start = start
self.message = message
def _get_progress(self, current: int) -> str:
"""
Get the progress of the stream.
:param current: Current progress - not in percentage.
:return: Progress bar and file size.
"""
if not self.size:
return ''
amount = self.width * current // self.size
progress = self.progress_bar[-amount:] if amount else ''
disp_size = display(self.to_readable(self.size))
return f'[{progress:<{self.width}}] {disp_size} '
def __iter__(self) -> Iterator[T]:
"""
Echo the stream, and update progress.
Catches all warnings raised whilst processing the stream to be
displayed afterwards. This keeps the UI tidy and prevents the
progress bar traveling over multiple lines.
:return: An echo of the input stream.
"""
with warnings.catch_warnings(record=True) as warnings_:
current = self._start
if self.message:
print(self.message)
start = time.clock()
for chunk in self.stream:
current += self.progress_fn(chunk)
progress = self._get_progress(current)
rate = current // max(int(time.clock() - start), 1)
disp_rate = display(self.to_readable(rate))
print(
f'\r{self.prefix}{progress}{disp_rate}/s',
end='',
flush=True,
)
yield chunk
print()
for warning in warnings_:
warnings.showwarning(
warning.message,
warning.category,
warning.filename,
warning.lineno,
)
class DataProgressStream(BaseProgressStream[T]):
"""Display progress of a data stream."""
def __init__(
self,
stream: Iterator[T],
size: Optional[int],
width: int = 20,
prefix: str = '',
message: Optional[str] = None,
):
"""Initialize DataProgressStream."""
super().__init__(
stream,
size,
Magnitude.ibyte,
len,
width,
prefix,
0,
message,
)
class ItemProgressStream(BaseProgressStream[T]):
"""Display progress of an item stream."""
def __init__(
self,
stream: Iterator[T],
size: Optional[int],
width: int = 20,
prefix: str = '',
message: Optional[str] = None,
):
"""Initialize ItemProgressStream."""
super().__init__(
stream,
size,
Magnitude.number,
lambda _: 1,
width,
prefix,
1,
message,
)它是一个模仿curl的小型库,它通过使用进度模块显示有关下载的信息。
"""Copy URL."""
import os
import pathlib
from typing import Any
# nosa(1): pylint
import requests
from . import progress
def curl(
path: pathlib.Path,
*args: Any,
**kwargs: Any,
) -> None:
"""
Download file to system.
Provides a progress bar of the file being downloaded and some
statistics around the file and download.
:param path: Local path to save the file to.
:param args&kwargs: Passed to :code:`request.get`.
"""
response = requests.get(*args, stream=True, **kwargs)
response.raise_for_status()
length_ = response.headers.get('content-length')
length = int(length_) if length_ else None
path.parent.mkdir(parents=True, exist_ok=True)
print(f'Downloading: {response.url}')
try:
with path.open('wb') as output:
for chunk in progress.DataProgressStream(
response.iter_content(chunk_size=512),
length,
prefix=' ',
):
output.write(chunk)
except BaseException:
os.remove(path)
raise公开一个非常简单的缓存。返回的对象都公开了一个ensure方法,以允许动作在以后发生,这意味着缓存可以依赖于另一个缓存。因此,如果不存在所需的7z存档,则存档可以依赖于文件下载程序。
"""
Simple file cache.
Exposes two forms of cache:
1. A file that is downloaded from a website.
2. A 7z archive cache - files that are extracted from a 7z archive.
"""
import pathlib
# nosa(1): pylint,mypy
import py7zlib
from . import curl, si
class CacheMethod:
"""Base cache object."""
def __init__(self, cache_path: pathlib.Path) -> None:
"""Initialize CacheMethod."""
self.cache_path = cache_path
def _is_cached(self, use_cache: bool) -> bool:
"""
Check if the target exist in the cache.
:param use_cache: Set to false to force redownload the data.
:return: True if we should use the cache.
"""
return use_cache and self.cache_path.exists()
def ensure(self, use_cache: bool = True) -> pathlib.Path:
"""
Ensure target file exists.
This should be overwritten in child classes.
:param use_cache: Set to false to force redownload the data.
:return: Location of file.
"""
raise NotImplementedError('Should be overwritten in subclass.')
class FileCache(CacheMethod):
"""Exposes a cache that allows downloading files."""
def __init__(self, cache_path: pathlib.Path, url: str) -> None:
"""Initialize FileCache."""
super().__init__(cache_path)
self.url = url
def ensure(self, use_cache: bool = True) -> pathlib.Path:
"""
Ensure target file exists.
This curls the file from the web to cache, providing a progress
bar whilst downloading.
:param use_cache: Set to false to force redownload the data.
:return: Location of file.
"""
if not self._is_cached(use_cache):
curl.curl(self.cache_path, self.url)
return self.cache_path
class Archive7zCache(CacheMethod):
"""Exposes a cache that allows unzipping 7z archives."""
def __init__(
self,
cache_path: pathlib.Path,
archive_cache: CacheMethod,
) -> None:
"""Initialize Archive7zCache."""
super().__init__(cache_path)
self.archive_cache = archive_cache
def ensure(self, use_cache: bool = True) -> pathlib.Path:
"""
Ensure target file exists.
Unzips the 7z archive showing the name and size of each file
being extracted.
:param use_cache: Set to false to force reunarchiving of the data.
:return: Location of file.
"""
if not self._is_cached(use_cache):
with self.archive_cache.ensure(use_cache).open('rb') as input_file:
print(f'Unziping: {input_file.name}')
archive = py7zlib.Archive7z(input_file)
directory = self.cache_path.parent
directory.mkdir(parents=True, exist_ok=True)
for name in archive.getnames():
output = directory / name
member = archive.getmember(name)
size = si.display(si.Magnitude.ibyte(member.size))
print(f' Unpacking[{size}] {name}')
with output.open('wb') as output_file:
output_file.write(archive.getmember(name).read())
return self.cache_path
class Cache:
"""Interface to make cache instances."""
def __init__(self, cache_dir: pathlib.Path) -> None:
"""Initialize Cache."""
self.cache_dir = cache_dir
def file(self, cache_path: str, url: str) -> FileCache:
"""
Get a file cache endpoint.
:param cache_path: Location of file relative to the cache directory.
:param url: URL location of the file to download from if not cached.
:return: A file cache endpoint.
"""
return FileCache(self.cache_dir / cache_path, url)
def archive_7z(
self,
cache_path: pathlib.Path,
archive_cache: CacheMethod,
) -> Archive7zCache:
"""
Get an archive cache endpoint.
:param cache_path: Location of file relative to the cache directory.
:param archive_cache: A cache endpoint to get the 7z archive from.
:return: An archive cache endpoint.
"""
return Archive7zCache(self.cache_dir / cache_path, archive_cache)这将Sphinx partial_xref对象扩展为完整的urls。这是因为您可以使用[link description](/q/1)链接到帖子。第二,Simon曾经没有将一个示例链接包装在后面,否则会导致代码爆炸,所以我们也会处理这个问题。[link description](target)就是一个例子。
"""Expand partial xrefs."""
# nosa: pylint,mypy
from typing import List, Type
import docutils.core
import docutils.nodes
import docutils.parsers
import docutils.transforms
from recommonmark.parser import CommonMarkParser
import sphinx.addnodes
__all__ = [
'custom_parser',
]
def custom_parser(prefix: str) -> Type[docutils.parsers.Parser]:
"""
Markdown parser with partial xref support.
Extends :code:`recommonmark.parser.CommonMarkParser` with to include
the :code:`custom_parser.PendingXRefTransform` transform.
:param prefix: Http base to prepend to partial hyperlinks.
:return: A custom parser to parse Markdown.
"""
class PendingXRefTransform(docutils.transforms.Transform):
"""
Expands partial links.
Some links are provided like :code:`[text](/a/2)``.
This expands the link to include the basename like:
.. :
http://codereview.meta.stackexchange.com
"""
default_priority = 999
@staticmethod
def handle_xref(
node: sphinx.addnodes.pending_xref,
) -> docutils.nodes.Node:
"""Convert partial_xref to desired output."""
referance, = node.children
ref = node.attributes['reftarget']
if ref != referance.attributes['refuri']:
print(
'target not the same',
node.attributes['reftarget'],
referance.attributes['refuri'],
)
if ref.startswith('/'):
referance['refuri'] = prefix + ref
return referance
# Handles 'links' like [this other thing](link)
text, = referance.children
if not isinstance(text, docutils.nodes.Text):
print('Referance text is not text.')
return docutils.nodes.Text(f'[{text.rawsource}]({ref})')
def traverse(self, node: docutils.nodes.Node) -> None:
"""Traverse the tree updating partial_xref nodes."""
transforms = []
children = []
for child in getattr(node, 'children', [])[:]:
if isinstance(child, sphinx.addnodes.pending_xref):
new_child = self.handle_xref(child)
transforms.append((child, new_child))
child = new_child
children.append(child)
replace = getattr(node, 'replace', None)
if replace is not None:
for old, new in transforms:
replace(old, new)
for child in children:
self.traverse(child)
def apply(self) -> None:
"""Docutils entry."""
self.traverse(self.document)
class CustomParser(CommonMarkParser):
"""Subclass of CommonMark to add XRef transform."""
def get_transforms(self) -> List[Type[docutils.transforms.Transform]]:
"""Get transformations used for this passer."""
return [PendingXRefTransform]
return CustomParser主函数coroutine增加了很多魔力,这在代码中有更好的描述。在大多数情况下,除非控制流进入一个糟糕的状态,否则它不会做太多事情。
"""
Coroutine helpers.
A lot of this module is based on the assumption that Python doesn't
seamlessly handle the destruction of coroutines when using multiplexing
or broadcasting. It also helps ease interactions when coroutines enter
closed states prematurely.
"""
import functools
import itertools
import types
from typing import (
Any, Callable, Generator, Iterable, Iterator, List, Optional, Tuple, Union,
)
NEW_SOURCE = object()
EXIT = object()
IIter = Union[Iterator, Iterable]
class CoroutineDelegator:
"""Helper class for delegating to coroutines."""
_queue: List[Tuple[IIter, Generator]]
def __init__(self) -> None:
"""Initialize CoroutineDelegator."""
self._queue = []
def send_to(
self,
source: IIter,
target: Generator,
) -> None:
"""
Add a source and target to send data to.
This does not send any data into the target, to do that use the
:meth:`CoroutineDelegator.run` function.
:param source: Input data, can be any iterable. Each is passed
straight unaltered to target.
:param target: This is the coroutine the data enters into to get
into the coroutine control flow.
"""
self._queue.append((source, target))
def _increment_coroutine_refs(self) -> None:
"""Increment the amount of sources for the coroutines."""
for _, target in self._queue:
if _is_magic_coroutine(target):
target.send(NEW_SOURCE)
def _run(self, source: IIter, target: Generator) -> Optional[Iterator]:
item = sentinel = object()
source_ = iter(source)
try:
for item in source_:
target.send(item)
except StopIteration:
if item is sentinel:
return source_
return itertools.chain([item], source_)
else:
if _is_magic_coroutine(target):
target.send(EXIT)
return None
def run(self) -> List[Iterator]:
"""
Send all data into the coroutine control flow.
:return: If a coroutine is closed prematurely the data that
hasn't been entered into the control flow will be
returned. Otherwise an empty list is.
"""
self._increment_coroutine_refs()
output: List[Optional[Iterator]] = [
None for _ in range(len(self._queue))
]
for i, (source, target) in enumerate(self._queue):
output[i] = self._run(source, target)
self._queue = []
if any(output):
return [iter(o or []) for o in output]
return []
def primed_coroutine(function: Callable[..., Generator]) -> Callable:
"""
Primes a coroutine at creation.
:param function: A coroutine function.
:return: The coroutine function wrapped to prime the coroutine at creation.
"""
function = types.coroutine(function)
def inner(*args: Any, **kwargs: Any) -> Generator:
output = function(*args, **kwargs)
next(output)
return output
return inner
def _is_magic_coroutine(target: Any) -> bool:
"""
Check if target is a magic coroutine.
:param target: An object to check against.
:return: If the object is a magic coroutine.
"""
try:
return bool(
target
and target.__qualname__.endswith('coroutine.<locals>.magic'),
)
except Exception:
return False
def coroutine(function: Callable) -> Callable:
"""
Wrap a coroutine generating function to make magic coroutines.
A magic coroutine is wrapped in a protective coroutine that eases
the destruction of coroutine pipelines. This is because the
coroutine is wrapped in a 'bubble' that:
1. Primes the coroutine when the first element of data is passed to it.
2. Sends information about the creation and destruction of other
coroutines in the pipeline. This allows a coroutine to destroy
itself when all providers have exited.
3. Handles when a coroutine is being prematurely closed, if this is
the case all target coroutines will be notified that some data
sources are no longer available allowing them to deallocate
themselves if needed.
4. Handles situations where a target coroutine has been prematurely
closed. In such a situation the current coroutine will be closed
and exit with a StopIteration error, as if the coroutine has been
closed with the :code:`.close`.
It should be noted that these coroutine pipelines should be started via the
:class:`stack_exchange_graph_data.helpers.coroutines.CoroutineDelegator`.
This is as it correctly initializes the entry coroutine, and handles
when the coroutine has been prematurely closed.
:param function: Standard coroutine generator function.
:return: Function that generates magic coroutines.
"""
self: Generator
@primed_coroutine
def magic(*args: Any, **kwargs: Any) -> Generator:
# Get magic coroutine targets
targets_ = itertools.chain(args, kwargs.values())
targets = [
t
for t in targets_
if _is_magic_coroutine(t)
]
# Create wrapped coroutine
wrapped = function(*args, **kwargs)
# Broadcast the creation of a new source to the targets
for target in targets:
target.send(NEW_SOURCE)
sources = 0
generator_exit_flag = False
generator_iteration_flag = False
active = False
try:
# Main coroutine loop handles adding and removing source counters.
while True:
item = yield
if item is NEW_SOURCE:
sources += 1
elif item is EXIT:
sources -= 1
if not sources:
break
else:
# Allows coroutines to be uninitialized until
# they're needed to be active.
if not active:
next(wrapped)
active = True
wrapped.send(item)
# Raised when a anything above parent has been killed
except RuntimeError:
pass
# Raised when a parent has been killed
except StopIteration:
generator_iteration_flag = True
# Raised when this is being killed via `.close`.
except GeneratorExit:
generator_exit_flag = True
finally:
# Close the wrapped coroutine
# This happens first, so any code in a `finally` can
# propagate correctly
try:
wrapped.close()
except RuntimeError:
pass
# Decrement target coroutine's source counters
if targets and not generator_iteration_flag:
for target in targets:
try:
for _ in range(sources):
target.send(EXIT)
except StopIteration:
pass
# Coroutine must yield when it's being killed. IDK why but it does.
# But it's illegal to yield when a GeneratorExit has been raised.
if not generator_exit_flag:
yield
@functools.wraps(function)
def inner(*args: Any, **kwargs: Any) -> Generator:
nonlocal self
self = magic(*args, **kwargs)
return self
return inner
@coroutine
def broadcast(*targets: Generator) -> Generator:
"""Broadcast items to targets."""
while True:
item = yield
for target in targets:
target.send(item)
@coroutine
def file_sink(*args: Any, **kwargs: Any) -> Generator:
"""Send all data to a file."""
with open(*args, **kwargs) as file_obj:
while True:
file_obj.write((yield))发布于 2019-08-29 00:19:52
你在存储这样的震级:
_MAGNITUDE = 'f p n μ m| k M G T P E Z Y'这是一种需要解析的序列化格式。这是不方便的,并使您的代码复杂化。只需存储元组或字典,就可以存储前缀字符串及其大小。有几种选择是:
(
('f', -15),
('p', -12),
('n', -9),
# ...
(
'mμnpf', # Negative prefixes
'kMGTPEZY' # Positive prefixes
)否则..。哇,这可是很多代码啊。它写得不太好,但对于这是一种数据处理工具,我认为它正在遭受一些功能膨胀的折磨。coroutine实现很有趣,但是您并不需要这堆代码--看到任何被描述为神奇的东西都会让我感到非常沮丧,特别是在Python中。例如,对于泛型广播,您可以只存储可迭代的函数引用。
https://codereview.stackexchange.com/questions/227035
复制相似问题