首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将页面合并到附加日志中

将页面合并到附加日志中
EN

Code Review用户
提问于 2021-06-14 18:18:53
回答 1查看 75关注 0票数 3

Situation

我正在从一个网站的分页列表中抓取数据,作为通知发送出去。在大多数情况下,我们只需要对第一页进行造型,并处理添加、删除和重复的内容。

当添加一个包含我们以前从未见过的项的页面时,我们只需将页面存储在内存中。

代码语言:javascript
复制
local:  
scrape: A B C C D E

输出

代码语言:javascript
复制
A B C C D E

当添加带有我们以前见过的项目的页面时。

  • 添加: 所有的添加将被记录到仅附加的日志中。我还希望向我和其他人发送通知。因此,我还将返回新的数据,以便调用代码可以与其他系统集成。
  • 删除: 删除是非常罕见的,我希望在这里编写代码。删除只是意味着我们假设拥有更多数据的一方是正确的。因此,在编程上,处理方式与添加类似-我们假设拥有更多数据的一方是正确的。
  • 重复:日志和刚刮过的页面之间的重复条目非常常见。我们不希望将副本添加到日志中,除非双方都有副本。

例如,假设日志中和新的擦伤中有以下项:

代码语言:javascript
复制
local:  C C D E
scrape: A B C C E

我们希望产出如下:

代码语言:javascript
复制
A B C C D E

注意,在新的刮取上,数据是如何更改的。添加了AB,删除了D,我们将数据规范化为2,而不是4,Cs和1,而不是2,Es,以及所有数据是如何排列一致的。

此外,该列表还有几个附加属性:

  • 新项目总是放在列表开头的前面。我编写的代码可以很好地处理预置列表和附加列表。通过从“头”到“尾”的旅行,对数据的任何添加将意味着我们在下一页得到一个重复的值。例如。如果列表有两页,并且在我们刮掉第一页之后添加了一个新项目:第1页:第2页,F,D,C,B,A,我们刮了FEDC,还有一个新的项目G被添加到列表中。第1页,第2页,G,F,D,D,C,B,A,你可以看到我们又刮了C。然而,如果我们反对名单,我们可能会错过C。
  • 项目极不可能被删除。删除项会导致列表向相反的方向移动。因此,我们遇到了我们之前的问题,通过与名单的旅行。第1页,第2页,F,D,C,B,A,我们刮掉了FEDC,E项被删除。第1页,第2页,F,D,C,B,A,如你所见,我们可以在B上输掉。

假设我们刮掉整个分页列表(默认情况下不是代码所做的)。与任何其他页面没有重叠的每个页面都将存储在NotificationLog.values中。虽然我们可以假设每个页面从下一个页面继续进行,但在默认情况下代码并不是这样的。因为对分页列表的添加或删除可以从列表中删除项,这取决于我们如何遍历数据。我决定错误地强迫使用者显式地假定每个页面都是连接的--在上一页中添加了一个项。因此,我们可以在任何方向刮掉列表,而不需要考虑页面是如何发生变异的。如果我们等到添加或删除发生时,那么我们可以第二次对每个页面进行赋值,以确保在两个页面之间有任何和所有的值。所以所有的东西都会通过页面之间的重叠连接在一起。

然而,附加是非常常见的,删除是非常非常罕见的。所以我决定(在课堂之外)撒谎,说我得到了上一页的一个值,作为刮伤的一部分。所以我们只需要对每一页进行一次解析。

我编写了以下代码来将页面存储在内存中,然后登录到文件中。我认为代码真的很复杂,可以清理掉。

注意:代码是99.9%的稳定,但不是100%的稳定。在下面提供的测试test_zip_values__same_tail中,我们可以看到输出是tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(0,))而不是稳定的tuplize([1, 2, 3, 4, 5, 6, 7], suffix=(0,))。这种情况非常非常罕见,所以我暂时跳过了实现二级双稳定合并。请跳过去。

代码语言:javascript
复制
from __future__ import annotations

import os
from typing import IO, Iterator, Optional, TypeVar

from .log import LogValues

TTuple = TypeVar("TTuple", bound=tuple)  # type: ignore
AValues = list[list[TTuple]]


class NotificationLog(LogValues[TTuple]):
    @staticmethod
    def _read_last_line(file: IO[bytes]) -> bytes:
        file.seek(-2, os.SEEK_END)
        while (
            file.read(1) != b"\n"
            and file.seek(-2, os.SEEK_CUR)
        ):
            pass
        return file.readline()

    @classmethod
    def _read_values(cls, bytes: IO[bytes], encoding: str = "utf-8") -> AValues[TTuple]:
        ret = cls._read_last_line(bytes).decode(encoding=encoding).rstrip("\n")
        if not ret:
            return []
        else:
            return [
                [
                    tuple(ret.split(","))  # type: ignore
                ]
            ]

    def add_values(self, values: list[TTuple], index: int = -1) -> Optional[list[TTuple]]:
        """
        Add values into the log.

        The values should be a page from a paginate list.
        Any values added to the log we will be returned,
        otherwise ``None`` will be returned.

        ..  note::

            For best performance the paginated list should:

            -   prepend new items to the start of the list, and
            -   have low volatility - few/no deletes.

            And you should add one value from the previous page in ``values``.
        """
        values = list(self.zip_values(iter(values), iter(self.values), index))
        had_values = bool(self.values)
        self.values = list(self._add_values(values, iter(self.values)))
        if values is not self.values[-1]:
            return None
        if had_values:
            values = values[-2::-1]
        for line in values:
            print(*line, sep=",", file=self.file)
        return values

    def _add_values(self, adds: list[TTuple], memories: Iterator[list[TTuple]], index: int = -1) -> Iterator[list[TTuple]]:
        """
        Add new memory into memories.

        ``adds`` will be run through :meth:`zip_values` beforehand.
        As such, all existing memories which overlapped with ``adds`` will be included in ``adds``.
        We need to; remove all memories which overlap ``add``,
        and insert ``adds`` in the correct position.
        """
        add = adds[0]
        try:
            mems = next(memories)
            while add[index] < mems[0][index]:
                yield mems
                mems = next(memories)
            while mems[0] in adds:
                mems = next(memories)
        except StopIteration:
            yield adds
        else:
            yield adds
            yield mems
            yield from memories

    @staticmethod
    def zip_values(adds: Iterator[TTuple], memories: Iterator[list[TTuple]], index: int = -1) -> Iterator[TTuple]:
        """
        Merge new values and pages together.

        Memories are descendingly ordered pages of values.
        The ``index``, defaulting to the last, is used to determine the order.
        The pages in memories are pages returned from a paginated list.
        Many sites use paginated lists, such as Google.
        Sometimes sites expose an 'infinate scroll',
        where the loads the next page automatically through JavaScript.

        Adds should be an ordered page to add to the log.
        If there are no overlaps between the ``adds`` and ``memories``
        then a new memory is added.
        Effectively adding the page as is into ``memories`` in the correct place.

        Once ``add`` reaches the final memory (the last value in the log),
        any additional data is ignored.
        The log should be opened in append mode and so we can only write to the end.
        :class:`NotificationLog` is not suitable for lists with random insertions.

        In prepended and low volatility lits, to merge pages into the log ASAP;
        always include one value from the previous page.
        The code is designed to correctly handle deleted data,
        from both ``add`` and ``memories``.
        As such the previous value can be the first, second, ..., or last value.

        ..  warn::

            If:
            
            -   new items to the list are **appended** to the **end** of the list, or
            -   if the list is highly susceptible to item deletion.

            **Do not** include a value from the previous page.
            Doing so can cause _data loss_ and will require an additional tool to recover from.

        """
        def take_same_index(start: TTuple, it: Iterator[TTuple]) -> tuple[Optional[TTuple], list[TTuple]]:
            values = [start]
            for value in it:
                if value[index] != start[index]:
                    return value, values
                else:
                    values.append(value)
            return None, values

        add: Optional[TTuple]
        mem: Optional[TTuple]
        try:
            add = next(adds)
        except StopIteration:
            return
        while True:
            try:
                mems_ = next(memories)
            except StopIteration:
                yield add
                yield from adds
                return
            if mems_[-1][index] <= add[index]:
                break
        mems = iter(mems_)
        while True:
            try:
                mem = next(mems)
                while mem[index] < add[index]:
                    yield add
                    add = next(adds)
            except StopIteration:
                return
            try:
                while True:
                    if mem[index] == add[index]:
                        if mem == add:
                            yield add
                            try:
                                add = next(adds)
                            except StopIteration:
                                add = None
                                raise
                            mem = next(mems)
                        else:
                            add, _adds = take_same_index(add, adds)
                            mem, _mems = take_same_index(mem, mems)
                            # TODO: Replace with a bi-stable algorithm
                            # Probably never going to need a bi-stable algorithm
                            yield from _mems
                            for _add in _adds:
                                if _add not in _mems:
                                    yield _add
                            if mem is None:
                                return
                            if add is None:
                                yield mem
                                raise StopIteration()
                    elif mem[index] < add[index]:
                        yield add
                        try:
                            add = next(adds)
                        except StopIteration:
                            add = None
                            yield mem
                            raise
                    else:
                        yield mem
                        mem = next(mems)
            except StopIteration:
                yield from mems
                if add is None:
                    return
            try:
                mems = iter(next(memories))
            except StopIteration:
                return

测试

我有一个相当全面的测试套件。

代码语言:javascript
复制
import io
from typing import Any

import pytest

from loglet import NotificationLog

VALUES = [
    ("1", "081041"),
    ("5", "081324"),
    ("5", "081327"),
]
STRING = "".join(
    ",".join(values) + "\n"
    for values in VALUES
)
BYTES = STRING.encode('utf-8')


@pytest.fixture
def log():
    return NotificationLog.from_streams(
        io.StringIO(STRING),
        io.BytesIO(BYTES),
    )


@pytest.fixture
def log_empty():
    return NotificationLog(
        io.StringIO(""),
        [],
    )


def test_read_last_line__empty_log():
    output = NotificationLog._read_last_line(io.BytesIO(b""))
    assert output == b""


def test_read_last_line__one_no_newline():
    output = NotificationLog._read_last_line(io.BytesIO(b"foo"))
    assert output == b"foo"


def test_read_last_line__one_log():
    output = NotificationLog._read_last_line(io.BytesIO(b"foo\n"))
    assert output == b"foo\n"


def test_read_last_line__two_log():
    output = NotificationLog._read_last_line(io.BytesIO(b"foo\nbar\n"))
    assert output == b"bar\n"


def test_read_values__empty_log():
    output = NotificationLog._read_values(io.BytesIO(b""))
    assert output == []


def test_read_values__empty_newline_log():
    output = NotificationLog._read_values(io.BytesIO(b"\n"))
    assert output == []


def test_read_values__one_log():
    output = NotificationLog._read_values(io.BytesIO(b"foo\n"))
    assert output == [[("foo",)]]


def test_read_values__two_log():
    output = NotificationLog._read_values(io.BytesIO(b"foo\nbar\n"))
    assert output == [[("bar",)]]


def tuplize(values, *, prefix: tuple[Any, ...] = (), suffix: tuple[Any, ...] = ()) -> list[tuple[Any, ...]]:
    try:
        return [
            prefix
            + (
                value
                if isinstance(value, tuple) else
                (value,)
            )
            + suffix
            for value in values
        ]
    except TypeError:
        return [prefix + (values,)]


def test_zip_values__empty_adds():
    output = list(NotificationLog.zip_values(
        iter(tuplize([])),
        iter([
            tuplize(3),
        ]),
    ))
    assert output == tuplize([])


def test_zip_values__empty_mems():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([]),
    ))
    assert output == tuplize(range(9, -1, -2))


def test_zip_values__basic_final_extra_add_eq():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(3),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 3])


def test_zip_values__basic_final_extra_add_neq():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(4),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 4])


def test_zip_values__basic_final_extra_mems_eq():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, 4, -2))),
        iter([
            tuplize([5, 3, 1]),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 3, 1])


def test_zip_values__basic_final_extra_mems_neq():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, 4, -2))),
        iter([
            tuplize([6, 4, 2, 0]),
        ]),
    ))
    assert output == tuplize([9, 7, 6, 5, 4, 2, 0])


def test_zip_values__basic_stem_extra_add_eq():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(3),
            tuplize(-10),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 3, 1])


def test_zip_values__basic_stem_extra_add_neq():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(4),
            tuplize(-10),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 4, 3, 1])


def test_zip_values__basic_stem_extra_mems_eq():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, 4, -2))),
        iter([
            tuplize([5, 3, 1]),
            tuplize(-10),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 3, 1])


def test_zip_values__basic_stem_extra_mems_neq():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, 4, -2))),
        iter([
            tuplize([6, 4, 2, 0]),
            tuplize(-10),
        ]),
    ))
    assert output == tuplize([9, 7, 6, 5, 4, 2, 0])


def test_zip_values__basic_multiple_equal():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(7),
            tuplize(5),
            tuplize(3),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 3])


def test_zip_values__basic_multiple_unequal():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(8),
            tuplize(6),
            tuplize(4),
        ]),
    ))
    assert output == tuplize([9, 8, 7, 6, 5, 4])


def test_zip_values__basic_multiple_chunk_aligned():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize([7, 6, 5]),
            tuplize(4),
        ]),
    ))
    assert output == tuplize([9, 7, 6, 5, 4])


def test_zip_values__basic_multiple_chunk_misaligned():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize([8, 7, 6]),
            tuplize(4),
        ]),
    ))
    assert output == tuplize([9, 8, 7, 6, 5, 4])


def test_zip_values__basic_multiple_extra():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(-10),
            tuplize(-11),
        ]),
    ))
    assert output == tuplize(range(9, -1, -2))


def test_zip_values__basic_head_overlap():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize([10, 9, 8]),
            tuplize(-11),
        ]),
    ))
    assert output == tuplize([10, 9, 8, 7, 5, 3, 1])


def test_zip_values__basic_head_skip():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(20),
            tuplize([10, 9, 8]),
            tuplize(-11),
        ]),
    ))
    assert output == tuplize([10, 9, 8, 7, 5, 3, 1])


def test_zip_values__basic_head_skip():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(-11),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 3, 1])


def test_zip_values__basic_head_skip():
    output = list(NotificationLog.zip_values(
        iter(tuplize(range(9, -1, -2))),
        iter([
            tuplize(-11),
        ]),
    ))
    assert output == tuplize([9, 7, 5, 3, 1])


def test_zip_values__same_tail():
    output = list(NotificationLog.zip_values(
        iter(
            tuplize([1, 3, 4, 5, 7], suffix=(0,)),
        ),
        iter([
            tuplize([1, 2, 4, 5, 6], suffix=(0,)),
        ]),
    ))
    assert output == tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(0,))


def test_zip_values__same_tail_add():
    output = list(NotificationLog.zip_values(
        iter(
            tuplize([1, 3, 4, 5, 7], suffix=(1,)),
        ),
        iter([
            tuplize([1, 2, 4, 5, 6], suffix=(1,))
            + tuplize([0], suffix=(0,)),
        ]),
    ))
    assert output == (
        tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(1,))
        + tuplize([0], suffix=(0,))
    )


def test_zip_values__same_mid():
    output = list(NotificationLog.zip_values(
        iter(
            tuplize([1, 3, 4, 5, 7], suffix=(1,))
            + tuplize([0], suffix=(0,)),
        ),
        iter([
            tuplize([1, 2, 4, 5, 6], suffix=(1,))
            + tuplize([0], suffix=(0,)),
        ]),
    ))
    assert output == (
        tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(1,))
        + tuplize([0], suffix=(0,))
    )


def test_zip_values__same_mid():
    output = list(NotificationLog.zip_values(
        iter(
            tuplize([1, 3, 4, 5, 7], suffix=(1,))
            + tuplize([0], suffix=(0,)),
        ),
        iter([
            tuplize([1, 2, 4, 5, 6], suffix=(1,))
            + tuplize([1], suffix=(0,)),
        ]),
    ))
    assert output == (
        tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(1,))
        + tuplize([1, 0], suffix=(0,))
    )


def test__add_values__head_mem(log: NotificationLog):
    adds = tuplize([3, 2, 1])
    mems = list(log._add_values(
        adds,
        iter([
            tuplize([9, 8, 7]),
            tuplize([6, 5, 4]),
        ]),
    ))
    assert (
        mems
        == [
            tuplize([9, 8, 7]),
            tuplize([6, 5, 4]),
            tuplize([3, 2, 1]),
        ]
    )
    assert mems[-1] is adds


def test__add_values__overwrite_mem(log: NotificationLog):
    adds = tuplize([3, 2, 1])
    mems = list(log._add_values(adds, iter([adds])))
    assert mems == [adds]
    assert mems[-1] is adds


def test__add_values__overwrite_mems(log: NotificationLog):
    adds = tuplize([6, 5, 4])
    mems = list(log._add_values(
        adds,
        iter([
            tuplize([9, 8, 7]),
            tuplize([6, 5, 4]),
            tuplize([3, 2, 1]),
        ]),
    ))
    assert (
        mems
        == [
            tuplize([9, 8, 7]),
            tuplize([6, 5, 4]),
            tuplize([3, 2, 1]),
        ]
    )


def test__add_values__tail_mem(log: NotificationLog):
    adds = tuplize([9, 8, 7])
    mems = list(log._add_values(
        adds,
        iter([
            tuplize([6, 5, 4]),
            tuplize([3, 2, 1]),
        ]),
    ))
    assert (
        mems
        == [
            tuplize([9, 8, 7]),
            tuplize([6, 5, 4]),
            tuplize([3, 2, 1]),
        ]
    )


HEAD_BUG_VALUES = [
    ("2", "181243"),
    ("2", "181241"),
    ("5", "181215"),
    ("5", "181201"),
    ("4", "181141"),
    ("4", "181141"),
    ("1", "180951"),
    ("1", "180948"),
    ("5", "172330"),
    ("7", "171721"),
    ("5", "171437"),
    ("5", "171211"),
    ("4", "170758"),
    ("4", "170757"),
    ("5", "162042"),
    ("6", "161510"),
    ("6", "161509"),
    ("6", "161507"),
    ("6", "161506"),
    ("6", "161505"),
    ("6", "161505"),
    ("5", "161347"),
    ("5", "161346"),
    ("4", "161155"),
    ("4", "161155"),
    ("1", "160744"),
    ("5", "151239"),
    ("7", "150418"),
    ("4", "141945"),
    ("4", "141944"),
    ("5", "141632"),
    ("9", "141532"),
    ("4", "141219"),
    ("5", "141149"),
    ("8", "141115"),
    ("8", "141114"),
    ("8", "141114"),
    ("8", "141113"),
    ("8", "141113"),
    ("8", "141113"),
    ("8", "141112"),
    ("8", "141112"),
    ("5", "132307"),
    ("5", "132303"),
    ("3", "132039"),
    ("3", "132039"),
    ("3", "132039"),
    ("4", "131502"),
    ("4", "131501"),
]


def test_add_values__head_bug(log: NotificationLog):
    assert log.add_values(HEAD_BUG_VALUES) is None


_CONNECTED_VALUES = VALUES + [
    ("5", "082047"),
    ("5", "082312"),
]


def test_add_values__connected_values(log: NotificationLog):
    assert (
        log.add_values(_CONNECTED_VALUES)
        == [
            ('5', '082047'),
            ('5', '081327'),
            ('5', '081324'),
            ('1', '081041'),
        ]
    )


def test_zip_values__head_bug(log: NotificationLog):
    assert list(log.zip_values(iter(HEAD_BUG_VALUES), iter([VALUES]))) == HEAD_BUG_VALUES
EN

回答 1

Code Review用户

发布于 2021-06-14 20:16:50

_read_last_line很聪明,但速度会很慢。最好是阅读缓冲区块,比如1k,并执行像.rindex这样的字符串操作。对于大多数类型的日志文件,这几乎肯定会在第一次尝试时包含一个换行符。一旦找到换行符,就不需要发出另一个readline();只需对缓冲区字符串进行切片即可。

你的类型暗示付出了很大的努力,但出于几个原因是值得怀疑的。你把# type: ignore洒得到处都是,这意味着有什么东西(比如?)不想吞下你的暗示。提示本身是奇怪的- _read_values返回字符串元组列表,但您的提示是泛型参数类型的元组列表。为什么,如果它们永远都是字符串?

代码语言:javascript
复制
        mems = next(memories)
        while add[index] < mems[0][index]:
            yield mems
            mems = next(memories)

be

代码语言:javascript
复制
for mems in memories:
    if add[index] >= mems[0][index]:
        break
    yield mems

?避免手动next通常更好。这将避免撞上StopIteration错误。您可能需要添加一个for/else来捕获丢失的-break

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/263038

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档