首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >postgresql wal log无法自动归档的一则案例

postgresql wal log无法自动归档的一则案例

原创
作者头像
保持热爱奔赴山海
发布2026-01-05 21:48:43
发布2026-01-05 21:48:43
1330
举报
文章被收录于专栏:数据库相关数据库相关

最近遇到一则seatunnel + postgresql 逻辑复制出现wal log无法归档(清理)的情况。

生产案例

1、seatunnel里面订阅了postgresql某个业务上的表(以t1为例)

2、最近几天t1表没有数据变动(写入、更新、删除 这几类操作都没有,因为对应的业务逻辑下掉了)

3、数据库里面有其他的写入非常频繁的日志表(也就是会产生大量的wal log)

PostgreSQL 的 WAL 清理原则是:只要有一个消费者(比如复制槽)还没消费到某条 WAL,那这条 WAL 及之前的所有 WAL 都不能删。

临时的处理:

将max_slot_wal_keep_size值调整下,例如改为51200(即50GB)或者更大些,具体需要根据自己的pg的繁忙程度来定。

也可以等本次wal log对接问题处理完成后,将max_slot_wal_keep_size稍后继续改为-1

⚠️注意:

改完这个参数后,稍等片刻PG就会把wal log 强制释放掉,即使有活跃的复制槽,也会强制删除最旧的WAL日志,这可能导致复制槽失效。因此调整此参数需要谨慎,确保不会影响正在进行的逻辑复制。

好在我这个场景下,本来业务上这个表就已经不继续写入数据了,因此这么暴力操作一下,也没啥问题。

处理完成后,还需要完善下监控,之前只监控了复制槽是否active,没有监控wal log的体积,这次也给补上。

1、可以在自己的数据库巡检平台中,添加自己加巡检sql

代码语言:txt
复制
		-- 这个sql在复制disable的情况下拿不到数据
		SELECT 
			application_name,
			state,
			sync_state,
			pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS sent_lag,
			pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), write_lsn)) AS write_lag,
			pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), flush_lsn)) AS flush_lag,
			pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)) AS replay_lag
		FROM pg_stat_replication;

		或者用下面这个sql
		-- 在复制停止或延迟的情况下都能抓取到数据 【推荐用这个SQL做巡检】
		SELECT 
			slot_name,
			slot_type,
			active,
			database,
			restart_lsn,
			confirmed_flush_lsn,
			pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)) AS lag_bytes
		FROM pg_replication_slots;

2、可以在postgresql_exporter中加自定义的query语句

代码语言:txt
复制
类似
# pg_wal_slot_lag_bytes
pg_wal_slot_lag_bytes:
query: |
    SELECT slot_name,pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes FROM pg_replication_slots;
metrics:
    - slot_name:
        usage: "LABEL"
        description: "Replication slot name"
    - lag_bytes:
        usage: "GAUGE"
        description: "Bytes of WAL retained due to replication slot"

实验

可以使用docker起一套逻辑复制的pg环境。

代码语言:txt
复制
docker-compose.yml  内容如下

services:
  pg-pub:
    image: postgres:15-alpine
    ports: ["1921:5432"]
    environment:
      POSTGRES_PASSWORD: mypass
    command: >
      postgres -c wal_level=logical
               -c max_replication_slots=4
               -c max_wal_senders=4
  pg-sub:
    image: postgres:15-alpine
    ports: ["1922:5432"]
    environment:
      POSTGRES_PASSWORD: mypass

发布端

代码语言:txt
复制
docker exec -ti pg_demo-pg-pub-1 bash

psql -Upostgres

-- 创建测试表
CREATE TABLE test_lr (
	id SERIAL PRIMARY KEY,
	data TEXT,
	created_at TIMESTAMP DEFAULT NOW()
);

-- 创建发布
CREATE PUBLICATION pub_test FOR TABLE test_lr;

订阅端

代码语言:txt
复制
docker exec -ti pg_demo-pg-sub-1 bash

psql -Upostgres

-- 创建相同结构的表(主键必须一致!)
CREATE TABLE test_lr (
	id SERIAL PRIMARY KEY,
	data TEXT,
	created_at TIMESTAMP DEFAULT NOW()
);

-- 创建订阅(指向发布端)
CREATE SUBSCRIPTION sub_test 
CONNECTION 'host=pg-pub port=5432 dbname=postgres user=postgres password=mypass' 
PUBLICATION pub_test;

发布端

代码语言:txt
复制
查看复制槽信息
postgres=# select * from pg_replication_slots \gx
-[ RECORD 1 ]-------+----------
slot_name           | sub_test
plugin              | pgoutput
slot_type           | logical
datoid              | 5
database            | postgres
temporary           | f
active              | t
active_pid          | 89
xmin                | 
catalog_xmin        | 729
restart_lsn         | 0/1544DB8
confirmed_flush_lsn | 0/1544DF0
wal_status          | reserved
safe_wal_size       | 
two_phase           | f


插入几条测试数据
postgres=# insert into test_lr (data) values ('test');
INSERT 0 1
postgres=# insert into test_lr (data) values ('test');
INSERT 0 1
postgres=# insert into test_lr (data) values ('test');
INSERT 0 1
postgres=# select * from test_lr;
 id | data |         created_at         
----+------+----------------------------
  1 | test | 2026-01-05 12:34:43.316483
  2 | test | 2026-01-05 12:34:43.897279
  3 | test | 2026-01-05 12:34:44.194527
(3 rows)

订阅端

代码语言:txt
复制
可以看到数据已经同步过来了
postgres=# select * from test_lr;
 id | data |         created_at         
----+------+----------------------------
  1 | test | 2026-01-05 12:34:43.316483
  2 | test | 2026-01-05 12:34:43.897279
  3 | test | 2026-01-05 12:34:44.194527
(3 rows)

逻辑复制环境就这样搭建完成了。

下面,来模拟 订阅端不消费的情况(消费太慢也是类似的情况)

订阅端

代码语言:txt
复制
禁用订阅
ALTER SUBSCRIPTION sub_test DISABLE;

发布端

代码语言:txt
复制
在发布端查看复制槽的状态
postgres=#  select * from pg_replication_slots \gx
-[ RECORD 1 ]-------+----------
slot_name           | sub_test
plugin              | pgoutput
slot_type           | logical
datoid              | 5
database            | postgres
temporary           | f
active              | f   --> 可以看到这里变成了f
active_pid          | 
xmin                | 
catalog_xmin        | 732
restart_lsn         | 0/1545218
confirmed_flush_lsn | 0/1545250
wal_status          | reserved
safe_wal_size       | 
two_phase           | f


在发布端快速插入10w条数据
INSERT INTO test_lr (data) SELECT 'data_' || g FROM generate_series(1, 100000) g;

在发布端观察复制槽是否堆积
SELECT 
	slot_name, 
	active,
	restart_lsn,
	confirmed_flush_lsn,
	pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_slot_lag
FROM pg_replication_slots;

可以看到 replication_slot_lag 的值不是0了,复制有延迟了。

然后,在发布端再插入10w条记录,再次查看发布端的情况
INSERT INTO test_lr (data) SELECT 'data_' || g FROM generate_series(1, 100000) g;

SELECT
	slot_name,
	active,
	restart_lsn,
	confirmed_flush_lsn,
	pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_slot_lag
FROM pg_replication_slots;

可以看到 replication_slot_lag 的值再次增加了。


select count(*) from test_lr;
 count  
--------
 200003
(1 row)



可以看到 restart_lsn 停止前进,replication_slot_lag 越来越大,说明 WAL 无法回收,磁盘可能爆掉!


至此,实验完成,将订阅端的暂停取消掉,稍等几秒钟,数据就追平了
ALTER SUBSCRIPTION sub_test enable;

select count(*) from test_lr;
 count  
--------
 200003
(1 row)


后记

我这里本次遇到的问题,目前不清楚是seatunnel的配置姿势上的问题还是seatunnel本身有问题。

目前网上看到的解决方案就是seatunnel在消费业务表的时候,同时也消费一张心跳表,这个心跳表只有一条数据,每隔5-10分钟会insert一条数据,从而触发 WAL 生成并推动复制槽的 LSN 前进。

心跳表的初始化和后续心跳数据写入,可以通过数据库运维平台进行(或者通过crontab或pg_cron都可以),心跳表的逻辑类似如下:

代码语言:txt
复制
-- 建表并加唯一索引
CREATE TABLE IF NOT EXISTS public.cdc_heartbeat (
    id SERIAL PRIMARY KEY,
    job_name TEXT NOT NULL DEFAULT 'default',
    last_heartbeat TIMESTAMP WITH TIME ZONE NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE UNIQUE INDEX IF NOT EXISTS idx_cdc_heartbeat_job ON public.cdc_heartbeat (job_name);


-- 插入数据类似如下
INSERT INTO public.cdc_heartbeat (job_name, last_heartbeat)
VALUES ('test-cdc-heartbeat', NOW())
ON CONFLICT (job_name) DO UPDATE SET last_heartbeat = NOW();

效果如下:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 生产案例
  • 实验
  • 后记
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档