我有很多时间序列数据进入一个postgres。时间序列数据进入消息队列,由使用者脚本读取并插入数据库,数据在一天内连续地流入系统,负载保持相当一致。
保存数据的表在时间戳上进行分区,并且只被此脚本写入。它是从其他服务机构读取的。
我想知道是否可以/适当地使用一个长时间运行的副本直接将数据复制到DB,或者更好地对结果进行批处理并运行多个插入/短期副本。
我使用的是Python和psycopg3驱动程序,它似乎有一个非常干净的界面来复制数据。我在想一种解决办法,如下所示:
def packets() -> AsyncGenerator[..., None]:
"""
Infinite generator that yields packets from the incoming stream.
"""
while True:
yield ...
async with cursor.copy("COPY data FROM STDIN") as copy:
for packet in packets():
await copy.write(packet)我目前有一个更类似于以下内容的解决方案:
def insert(packets: Sequence[...]) -> None:
"""bulk insert packets into the DB"""
async with pool.connection() as conn:
conn.executemany("...", packets)
buffer = []
for packet in packets():
buffer.append(packet)
if len(buffer) == 100:
insert(buffer)
buffer.clear()上面的两种Python解决方案都很幼稚,我知道可以做很多事情来优化这两种解决方案(特别是批处理)。
编辑:我想我在最初的问题中并没有说得特别清楚,数据是实时进入系统的,并且没有停止。每当我看到所讨论的插入速度时,复制几乎总是要快得多,但是它们通常是比较时间来恢复一个DB或其他将运行并最终完成的任务,这个过程只是运行了一段不确定的时间,持续地摄取数据。
发布于 2023-02-13 07:03:49
无论您在一个COPY语句中加载了多少行,它总是一个事务。因此,在语句完成之前,数据是不可见的。此外,长时间运行事务也是不明智的。
因此,我的建议是使用COPY,但是要对加载的行进行批处理,这样单个COPY语句就不会超过几秒钟。
https://dba.stackexchange.com/questions/323350
复制相似问题