我正在从一个网站的分页列表中抓取数据,作为通知发送出去。在大多数情况下,我们只需要对第一页进行造型,并处理添加、删除和重复的内容。
当添加一个包含我们以前从未见过的项的页面时,我们只需将页面存储在内存中。
local:
scrape: A B C C D E输出
A B C C D E当添加带有我们以前见过的项目的页面时。
例如,假设日志中和新的擦伤中有以下项:
local: C C D E
scrape: A B C C E我们希望产出如下:
A B C C D E注意,在新的刮取上,数据是如何更改的。添加了A和B,删除了D,我们将数据规范化为2,而不是4,Cs和1,而不是2,Es,以及所有数据是如何排列一致的。
此外,该列表还有几个附加属性:
假设我们刮掉整个分页列表(默认情况下不是代码所做的)。与任何其他页面没有重叠的每个页面都将存储在NotificationLog.values中。虽然我们可以假设每个页面从下一个页面继续进行,但在默认情况下代码并不是这样的。因为对分页列表的添加或删除可以从列表中删除项,这取决于我们如何遍历数据。我决定错误地强迫使用者显式地假定每个页面都是连接的--在上一页中添加了一个项。因此,我们可以在任何方向刮掉列表,而不需要考虑页面是如何发生变异的。如果我们等到添加或删除发生时,那么我们可以第二次对每个页面进行赋值,以确保在两个页面之间有任何和所有的值。所以所有的东西都会通过页面之间的重叠连接在一起。
然而,附加是非常常见的,删除是非常非常罕见的。所以我决定(在课堂之外)撒谎,说我得到了上一页的一个值,作为刮伤的一部分。所以我们只需要对每一页进行一次解析。
我编写了以下代码来将页面存储在内存中,然后登录到文件中。我认为代码真的很复杂,可以清理掉。
注意:代码是99.9%的稳定,但不是100%的稳定。在下面提供的测试test_zip_values__same_tail中,我们可以看到输出是tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(0,))而不是稳定的tuplize([1, 2, 3, 4, 5, 6, 7], suffix=(0,))。这种情况非常非常罕见,所以我暂时跳过了实现二级双稳定合并。请跳过去。
from __future__ import annotations
import os
from typing import IO, Iterator, Optional, TypeVar
from .log import LogValues
TTuple = TypeVar("TTuple", bound=tuple) # type: ignore
AValues = list[list[TTuple]]
class NotificationLog(LogValues[TTuple]):
@staticmethod
def _read_last_line(file: IO[bytes]) -> bytes:
file.seek(-2, os.SEEK_END)
while (
file.read(1) != b"\n"
and file.seek(-2, os.SEEK_CUR)
):
pass
return file.readline()
@classmethod
def _read_values(cls, bytes: IO[bytes], encoding: str = "utf-8") -> AValues[TTuple]:
ret = cls._read_last_line(bytes).decode(encoding=encoding).rstrip("\n")
if not ret:
return []
else:
return [
[
tuple(ret.split(",")) # type: ignore
]
]
def add_values(self, values: list[TTuple], index: int = -1) -> Optional[list[TTuple]]:
"""
Add values into the log.
The values should be a page from a paginate list.
Any values added to the log we will be returned,
otherwise ``None`` will be returned.
.. note::
For best performance the paginated list should:
- prepend new items to the start of the list, and
- have low volatility - few/no deletes.
And you should add one value from the previous page in ``values``.
"""
values = list(self.zip_values(iter(values), iter(self.values), index))
had_values = bool(self.values)
self.values = list(self._add_values(values, iter(self.values)))
if values is not self.values[-1]:
return None
if had_values:
values = values[-2::-1]
for line in values:
print(*line, sep=",", file=self.file)
return values
def _add_values(self, adds: list[TTuple], memories: Iterator[list[TTuple]], index: int = -1) -> Iterator[list[TTuple]]:
"""
Add new memory into memories.
``adds`` will be run through :meth:`zip_values` beforehand.
As such, all existing memories which overlapped with ``adds`` will be included in ``adds``.
We need to; remove all memories which overlap ``add``,
and insert ``adds`` in the correct position.
"""
add = adds[0]
try:
mems = next(memories)
while add[index] < mems[0][index]:
yield mems
mems = next(memories)
while mems[0] in adds:
mems = next(memories)
except StopIteration:
yield adds
else:
yield adds
yield mems
yield from memories
@staticmethod
def zip_values(adds: Iterator[TTuple], memories: Iterator[list[TTuple]], index: int = -1) -> Iterator[TTuple]:
"""
Merge new values and pages together.
Memories are descendingly ordered pages of values.
The ``index``, defaulting to the last, is used to determine the order.
The pages in memories are pages returned from a paginated list.
Many sites use paginated lists, such as Google.
Sometimes sites expose an 'infinate scroll',
where the loads the next page automatically through JavaScript.
Adds should be an ordered page to add to the log.
If there are no overlaps between the ``adds`` and ``memories``
then a new memory is added.
Effectively adding the page as is into ``memories`` in the correct place.
Once ``add`` reaches the final memory (the last value in the log),
any additional data is ignored.
The log should be opened in append mode and so we can only write to the end.
:class:`NotificationLog` is not suitable for lists with random insertions.
In prepended and low volatility lits, to merge pages into the log ASAP;
always include one value from the previous page.
The code is designed to correctly handle deleted data,
from both ``add`` and ``memories``.
As such the previous value can be the first, second, ..., or last value.
.. warn::
If:
- new items to the list are **appended** to the **end** of the list, or
- if the list is highly susceptible to item deletion.
**Do not** include a value from the previous page.
Doing so can cause _data loss_ and will require an additional tool to recover from.
"""
def take_same_index(start: TTuple, it: Iterator[TTuple]) -> tuple[Optional[TTuple], list[TTuple]]:
values = [start]
for value in it:
if value[index] != start[index]:
return value, values
else:
values.append(value)
return None, values
add: Optional[TTuple]
mem: Optional[TTuple]
try:
add = next(adds)
except StopIteration:
return
while True:
try:
mems_ = next(memories)
except StopIteration:
yield add
yield from adds
return
if mems_[-1][index] <= add[index]:
break
mems = iter(mems_)
while True:
try:
mem = next(mems)
while mem[index] < add[index]:
yield add
add = next(adds)
except StopIteration:
return
try:
while True:
if mem[index] == add[index]:
if mem == add:
yield add
try:
add = next(adds)
except StopIteration:
add = None
raise
mem = next(mems)
else:
add, _adds = take_same_index(add, adds)
mem, _mems = take_same_index(mem, mems)
# TODO: Replace with a bi-stable algorithm
# Probably never going to need a bi-stable algorithm
yield from _mems
for _add in _adds:
if _add not in _mems:
yield _add
if mem is None:
return
if add is None:
yield mem
raise StopIteration()
elif mem[index] < add[index]:
yield add
try:
add = next(adds)
except StopIteration:
add = None
yield mem
raise
else:
yield mem
mem = next(mems)
except StopIteration:
yield from mems
if add is None:
return
try:
mems = iter(next(memories))
except StopIteration:
return我有一个相当全面的测试套件。
import io
from typing import Any
import pytest
from loglet import NotificationLog
VALUES = [
("1", "081041"),
("5", "081324"),
("5", "081327"),
]
STRING = "".join(
",".join(values) + "\n"
for values in VALUES
)
BYTES = STRING.encode('utf-8')
@pytest.fixture
def log():
return NotificationLog.from_streams(
io.StringIO(STRING),
io.BytesIO(BYTES),
)
@pytest.fixture
def log_empty():
return NotificationLog(
io.StringIO(""),
[],
)
def test_read_last_line__empty_log():
output = NotificationLog._read_last_line(io.BytesIO(b""))
assert output == b""
def test_read_last_line__one_no_newline():
output = NotificationLog._read_last_line(io.BytesIO(b"foo"))
assert output == b"foo"
def test_read_last_line__one_log():
output = NotificationLog._read_last_line(io.BytesIO(b"foo\n"))
assert output == b"foo\n"
def test_read_last_line__two_log():
output = NotificationLog._read_last_line(io.BytesIO(b"foo\nbar\n"))
assert output == b"bar\n"
def test_read_values__empty_log():
output = NotificationLog._read_values(io.BytesIO(b""))
assert output == []
def test_read_values__empty_newline_log():
output = NotificationLog._read_values(io.BytesIO(b"\n"))
assert output == []
def test_read_values__one_log():
output = NotificationLog._read_values(io.BytesIO(b"foo\n"))
assert output == [[("foo",)]]
def test_read_values__two_log():
output = NotificationLog._read_values(io.BytesIO(b"foo\nbar\n"))
assert output == [[("bar",)]]
def tuplize(values, *, prefix: tuple[Any, ...] = (), suffix: tuple[Any, ...] = ()) -> list[tuple[Any, ...]]:
try:
return [
prefix
+ (
value
if isinstance(value, tuple) else
(value,)
)
+ suffix
for value in values
]
except TypeError:
return [prefix + (values,)]
def test_zip_values__empty_adds():
output = list(NotificationLog.zip_values(
iter(tuplize([])),
iter([
tuplize(3),
]),
))
assert output == tuplize([])
def test_zip_values__empty_mems():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([]),
))
assert output == tuplize(range(9, -1, -2))
def test_zip_values__basic_final_extra_add_eq():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(3),
]),
))
assert output == tuplize([9, 7, 5, 3])
def test_zip_values__basic_final_extra_add_neq():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(4),
]),
))
assert output == tuplize([9, 7, 5, 4])
def test_zip_values__basic_final_extra_mems_eq():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, 4, -2))),
iter([
tuplize([5, 3, 1]),
]),
))
assert output == tuplize([9, 7, 5, 3, 1])
def test_zip_values__basic_final_extra_mems_neq():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, 4, -2))),
iter([
tuplize([6, 4, 2, 0]),
]),
))
assert output == tuplize([9, 7, 6, 5, 4, 2, 0])
def test_zip_values__basic_stem_extra_add_eq():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(3),
tuplize(-10),
]),
))
assert output == tuplize([9, 7, 5, 3, 1])
def test_zip_values__basic_stem_extra_add_neq():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(4),
tuplize(-10),
]),
))
assert output == tuplize([9, 7, 5, 4, 3, 1])
def test_zip_values__basic_stem_extra_mems_eq():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, 4, -2))),
iter([
tuplize([5, 3, 1]),
tuplize(-10),
]),
))
assert output == tuplize([9, 7, 5, 3, 1])
def test_zip_values__basic_stem_extra_mems_neq():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, 4, -2))),
iter([
tuplize([6, 4, 2, 0]),
tuplize(-10),
]),
))
assert output == tuplize([9, 7, 6, 5, 4, 2, 0])
def test_zip_values__basic_multiple_equal():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(7),
tuplize(5),
tuplize(3),
]),
))
assert output == tuplize([9, 7, 5, 3])
def test_zip_values__basic_multiple_unequal():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(8),
tuplize(6),
tuplize(4),
]),
))
assert output == tuplize([9, 8, 7, 6, 5, 4])
def test_zip_values__basic_multiple_chunk_aligned():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize([7, 6, 5]),
tuplize(4),
]),
))
assert output == tuplize([9, 7, 6, 5, 4])
def test_zip_values__basic_multiple_chunk_misaligned():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize([8, 7, 6]),
tuplize(4),
]),
))
assert output == tuplize([9, 8, 7, 6, 5, 4])
def test_zip_values__basic_multiple_extra():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(-10),
tuplize(-11),
]),
))
assert output == tuplize(range(9, -1, -2))
def test_zip_values__basic_head_overlap():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize([10, 9, 8]),
tuplize(-11),
]),
))
assert output == tuplize([10, 9, 8, 7, 5, 3, 1])
def test_zip_values__basic_head_skip():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(20),
tuplize([10, 9, 8]),
tuplize(-11),
]),
))
assert output == tuplize([10, 9, 8, 7, 5, 3, 1])
def test_zip_values__basic_head_skip():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(-11),
]),
))
assert output == tuplize([9, 7, 5, 3, 1])
def test_zip_values__basic_head_skip():
output = list(NotificationLog.zip_values(
iter(tuplize(range(9, -1, -2))),
iter([
tuplize(-11),
]),
))
assert output == tuplize([9, 7, 5, 3, 1])
def test_zip_values__same_tail():
output = list(NotificationLog.zip_values(
iter(
tuplize([1, 3, 4, 5, 7], suffix=(0,)),
),
iter([
tuplize([1, 2, 4, 5, 6], suffix=(0,)),
]),
))
assert output == tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(0,))
def test_zip_values__same_tail_add():
output = list(NotificationLog.zip_values(
iter(
tuplize([1, 3, 4, 5, 7], suffix=(1,)),
),
iter([
tuplize([1, 2, 4, 5, 6], suffix=(1,))
+ tuplize([0], suffix=(0,)),
]),
))
assert output == (
tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(1,))
+ tuplize([0], suffix=(0,))
)
def test_zip_values__same_mid():
output = list(NotificationLog.zip_values(
iter(
tuplize([1, 3, 4, 5, 7], suffix=(1,))
+ tuplize([0], suffix=(0,)),
),
iter([
tuplize([1, 2, 4, 5, 6], suffix=(1,))
+ tuplize([0], suffix=(0,)),
]),
))
assert output == (
tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(1,))
+ tuplize([0], suffix=(0,))
)
def test_zip_values__same_mid():
output = list(NotificationLog.zip_values(
iter(
tuplize([1, 3, 4, 5, 7], suffix=(1,))
+ tuplize([0], suffix=(0,)),
),
iter([
tuplize([1, 2, 4, 5, 6], suffix=(1,))
+ tuplize([1], suffix=(0,)),
]),
))
assert output == (
tuplize([1, 2, 4, 5, 6, 3, 7], suffix=(1,))
+ tuplize([1, 0], suffix=(0,))
)
def test__add_values__head_mem(log: NotificationLog):
adds = tuplize([3, 2, 1])
mems = list(log._add_values(
adds,
iter([
tuplize([9, 8, 7]),
tuplize([6, 5, 4]),
]),
))
assert (
mems
== [
tuplize([9, 8, 7]),
tuplize([6, 5, 4]),
tuplize([3, 2, 1]),
]
)
assert mems[-1] is adds
def test__add_values__overwrite_mem(log: NotificationLog):
adds = tuplize([3, 2, 1])
mems = list(log._add_values(adds, iter([adds])))
assert mems == [adds]
assert mems[-1] is adds
def test__add_values__overwrite_mems(log: NotificationLog):
adds = tuplize([6, 5, 4])
mems = list(log._add_values(
adds,
iter([
tuplize([9, 8, 7]),
tuplize([6, 5, 4]),
tuplize([3, 2, 1]),
]),
))
assert (
mems
== [
tuplize([9, 8, 7]),
tuplize([6, 5, 4]),
tuplize([3, 2, 1]),
]
)
def test__add_values__tail_mem(log: NotificationLog):
adds = tuplize([9, 8, 7])
mems = list(log._add_values(
adds,
iter([
tuplize([6, 5, 4]),
tuplize([3, 2, 1]),
]),
))
assert (
mems
== [
tuplize([9, 8, 7]),
tuplize([6, 5, 4]),
tuplize([3, 2, 1]),
]
)
HEAD_BUG_VALUES = [
("2", "181243"),
("2", "181241"),
("5", "181215"),
("5", "181201"),
("4", "181141"),
("4", "181141"),
("1", "180951"),
("1", "180948"),
("5", "172330"),
("7", "171721"),
("5", "171437"),
("5", "171211"),
("4", "170758"),
("4", "170757"),
("5", "162042"),
("6", "161510"),
("6", "161509"),
("6", "161507"),
("6", "161506"),
("6", "161505"),
("6", "161505"),
("5", "161347"),
("5", "161346"),
("4", "161155"),
("4", "161155"),
("1", "160744"),
("5", "151239"),
("7", "150418"),
("4", "141945"),
("4", "141944"),
("5", "141632"),
("9", "141532"),
("4", "141219"),
("5", "141149"),
("8", "141115"),
("8", "141114"),
("8", "141114"),
("8", "141113"),
("8", "141113"),
("8", "141113"),
("8", "141112"),
("8", "141112"),
("5", "132307"),
("5", "132303"),
("3", "132039"),
("3", "132039"),
("3", "132039"),
("4", "131502"),
("4", "131501"),
]
def test_add_values__head_bug(log: NotificationLog):
assert log.add_values(HEAD_BUG_VALUES) is None
_CONNECTED_VALUES = VALUES + [
("5", "082047"),
("5", "082312"),
]
def test_add_values__connected_values(log: NotificationLog):
assert (
log.add_values(_CONNECTED_VALUES)
== [
('5', '082047'),
('5', '081327'),
('5', '081324'),
('1', '081041'),
]
)
def test_zip_values__head_bug(log: NotificationLog):
assert list(log.zip_values(iter(HEAD_BUG_VALUES), iter([VALUES]))) == HEAD_BUG_VALUES发布于 2021-06-14 20:16:50
_read_last_line很聪明,但速度会很慢。最好是阅读缓冲区块,比如1k,并执行像.rindex这样的字符串操作。对于大多数类型的日志文件,这几乎肯定会在第一次尝试时包含一个换行符。一旦找到换行符,就不需要发出另一个readline();只需对缓冲区字符串进行切片即可。
你的类型暗示付出了很大的努力,但出于几个原因是值得怀疑的。你把# type: ignore洒得到处都是,这意味着有什么东西(比如?)不想吞下你的暗示。提示本身是奇怪的- _read_values返回字符串元组列表,但您的提示是泛型参数类型的元组列表。为什么,如果它们永远都是字符串?
能
mems = next(memories)
while add[index] < mems[0][index]:
yield mems
mems = next(memories)be
for mems in memories:
if add[index] >= mems[0][index]:
break
yield mems?避免手动next通常更好。这将避免撞上StopIteration错误。您可能需要添加一个for/else来捕获丢失的-break。
https://codereview.stackexchange.com/questions/263038
复制相似问题