
算法数据仓库的核心特征是高基数、时空分布不均和时间局部性明显。以我们的人脸识别项目为例,单表存储3.8亿条样本记录后,VACUUM操作耗时18小时,简单SELECT * WHERE created_at > NOW() - INTERVAL '1 day'查询需要全表扫描480GB数据,执行时间长达4分12秒。
性能瓶颈 | 单表方案表现 | 影响程度 | 根本原因 |
|---|---|---|---|
索引膨胀 | 主键索引体积达97GB,密度仅62% | ⭐⭐⭐⭐⭐ | MVCC机制产生大量死元组 |
锁粒度 | VACUUM期间全表锁导致查询阻塞 | ⭐⭐⭐⭐⭐ | 表级锁无法并发维护 |
IO放大 | 时间范围查询读取全部数据块 | ⭐⭐⭐⭐ | 没有物理数据隔离 |
维护窗口 | 完整备份需要16小时 | ⭐⭐⭐⭐ | 全量数据拷贝 |
我们选择复合分区策略:一级按created_at时间范围分区(按天),二级按project_id列表分区。这种设计匹配了90%以上的查询模式都包含时间过滤条件,且不同项目数据需要逻辑隔离的业务特点。
-- 创建主分区表结构(声明式分区)
DROP TABLE IF EXISTS algorithm_samples CASCADE;
CREATE TABLE algorithm_samples (
sample_id BIGINT NOT NULL,
project_id VARCHAR(50) NOT NULL,
data_path TEXT,
label_status VARCHAR(20),
feature_vector JSONB,
quality_score NUMERIC(5,4),
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP,
metadata JSONB,
-- 主键必须包含所有分区键列
PRIMARY KEY (sample_id, created_at, project_id)
) PARTITION BY RANGE (created_at);
-- 创建一级分区(按天)
CREATE TABLE samples_2024q1d01 PARTITION OF algorithm_samples
FOR VALUES FROM ('2024-01-01 00:00:00') TO ('2024-01-02 00:00:00')
PARTITION BY LIST (project_id);
CREATE TABLE samples_2024q1d02 PARTITION OF algorithm_samples
FOR VALUES FROM ('2024-01-02 00:00:00') TO ('2024-01-03 00:00:00')
PARTITION BY LIST (project_id);
-- 创建二级分区(按项目)
CREATE TABLE samples_2024q1d01_pface PARTITION OF samples_2024q1d01
FOR VALUES IN ('face_recognition_v2');
CREATE TABLE samples_2024q1d01_pvehicle PARTITION OF samples_2024q1d01
FOR VALUES IN ('vehicle_detection');
-- 自动化分区创建函数
CREATE OR REPLACE FUNCTION create_daily_partitions(
start_date DATE,
end_date DATE
) RETURNS INTEGER AS $$
DECLARE
current_date DATE := start_date;
partitions_created INTEGER := 0;
partition_name TEXT;
subpartition_sql TEXT;
active_projects TEXT[] := ARRAY['face_recognition_v2', 'vehicle_detection', 'ocr_recognition'];
BEGIN
WHILE current_date < end_date LOOP
partition_name := 'samples_' || to_char(current_date, 'yYYmMMdDD');
-- 创建一级分区(按天)
IF NOT EXISTS (
SELECT 1 FROM pg_tables WHERE tablename = partition_name
) THEN
EXECUTE format(
'CREATE TABLE %I PARTITION OF algorithm_samples
FOR VALUES FROM (%L) TO (%L) PARTITION BY LIST (project_id)',
partition_name,
current_date,
current_date + 1
);
-- 为每个活跃项目创建二级分区
FOREACH project IN ARRAY active_projects LOOP
EXECUTE format(
'CREATE TABLE %I PARTITION OF %I FOR VALUES IN (%L)',
partition_name || '_' || replace(project, '_', ''),
partition_name,
project
);
END LOOP;
partitions_created := partitions_created + 1;
END IF;
current_date := current_date + 1;
END LOOP;
RETURN partitions_created;
END;
$$ LANGUAGE plpgsql;
-- 预创建未来30天分区
SELECT create_daily_partitions(
CURRENT_DATE,
CURRENT_DATE + INTERVAL '30 days'
);为保证线上业务零停机,我们实现了双写+增量追赶的平滑迁移方案。该方案在3天内完成3.8亿条记录的迁移,数据一致性校验准确率100%。
# migration/dual_writer.py
import psycopg2
import threading
from queue import Queue
import time
from datetime import datetime, timedelta
import logging
class AlgorithmSamplesDualWriter:
"""算法样本双写管理器"""
def __init__(self, old_dsn: str, new_dsn: str):
# 连接旧单表数据库
self.old_conn = psycopg2.connect(old_dsn)
self.old_conn.autocommit = False
# 连接新分区表数据库
self.new_conn = psycopg2.connect(new_dsn)
self.new_conn.autocommit = False
# 配置日志
self.logger = logging.getLogger(__name__)
# 统计信息
self.stats = {
'old_success': 0,
'new_success': 0,
'new_failed': 0,
'latency_old_ms': 0,
'latency_new_ms': 0
}
self.stats_lock = threading.Lock()
# 异常队列用于补偿
self.failed_queue = Queue(maxsize=10000)
def write_sample(self, sample_data: dict, async_mode: bool = False):
"""双写单条样本"""
if async_mode:
# 异步双写,不阻塞主流程
threading.Thread(
target=self._write_async,
args=(sample_data,),
daemon=True
).start()
# 只等待旧表写入,新表写入后台进行
return self._write_old_synchronous(sample_data)
else:
# 同步双写,保证强一致
return self._write_both_synchronous(sample_data)
def _write_old_synchronous(self, data: dict):
"""同步写入旧表"""
sql = """
INSERT INTO algorithm_samples_old (
sample_id, project_id, data_path, label_status,
feature_vector, quality_score, created_at, updated_at, metadata
) VALUES (
%(sample_id)s, %(project_id)s, %(data_path)s, %(label_status)s,
%(feature_vector)s, %(quality_score)s, %(created_at)s, %(updated_at)s, %(metadata)s
) ON CONFLICT (sample_id) DO UPDATE SET
updated_at = EXCLUDED.updated_at,
label_status = EXCLUDED.label_status,
quality_score = EXCLUDED.quality_score
"""
start = time.time()
try:
with self.old_conn.cursor() as cur:
cur.execute(sql, data)
self.old_conn.commit()
latency = (time.time() - start) * 1000
with self.stats_lock:
self.stats['old_success'] += 1
self.stats['latency_old_ms'] += latency
return True
except Exception as e:
self.logger.error(f"Old table write failed: {e}", exc_info=True)
self.old_conn.rollback()
return False
def _write_new_synchronous(self, data: dict):
"""同步写入新分区表"""
sql = """
INSERT INTO algorithm_samples (
sample_id, project_id, data_path, label_status,
feature_vector, quality_score, created_at, updated_at, metadata
) VALUES (
%(sample_id)s, %(project_id)s, %(data_path)s, %(label_status)s,
%(feature_vector)s, %(quality_score)s, %(created_at)s, %(updated_at)s, %(metadata)s
) ON CONFLICT (sample_id, created_at, project_id) DO UPDATE SET
updated_at = EXCLUDED.updated_at,
label_status = EXCLUDED.label_status,
quality_score = EXCLUDED.quality_score
"""
start = time.time()
try:
with self.new_conn.cursor() as cur:
cur.execute(sql, data)
self.new_conn.commit()
latency = (time.time() - start) * 1000
with self.stats_lock:
self.stats['new_success'] += 1
self.stats['latency_new_ms'] += latency
return True
except Exception as e:
self.logger.error(f"New table write failed: {e}", exc_info=True)
self.new_conn.rollback()
# 放入失败队列用于补偿
try:
self.failed_queue.put_nowait({
'data': data,
'timestamp': datetime.now(),
'error': str(e)
})
except:
self.logger.warning("Failed queue is full, dropping failed record")
with self.stats_lock:
self.stats['new_failed'] += 1
return False
def _write_both_synchronous(self, data: dict):
"""同步双写两个表"""
# 使用两阶段提交保证原子性
try:
# 阶段1:准备两个连接
old_prepared = False
new_prepared = False
# 预提交
with self.old_conn.cursor() as cur:
cur.execute("PREPARE TRANSACTION 'old_tx_' || %s",
(data['sample_id'],))
old_prepared = True
with self.new_conn.cursor() as cur:
cur.execute("PREPARE TRANSACTION 'new_tx_' || %s",
(data['sample_id'],))
new_prepared = True
# 阶段2:正式提交
if old_prepared and new_prepared:
with self.old_conn.cursor() as cur:
cur.execute(f"COMMIT PREPARED 'old_tx_{data['sample_id']}'")
with self.new_conn.cursor() as cur:
cur.execute(f"COMMIT PREPARED 'new_tx_{data['sample_id']}'")
return True
except Exception as e:
self.logger.error(f"Two-phase commit failed: {e}", exc_info=True)
# 回滚已准备的连接
if old_prepared:
with self.old_conn.cursor() as cur:
cur.execute(f"ROLLBACK PREPARED 'old_tx_{data['sample_id']}'")
if new_prepared:
with self.new_conn.cursor() as cur:
cur.execute(f"ROLLBACK PREPARED 'new_tx_{data['sample_id']}'")
return False
def get_stats(self):
"""获取双写统计信息"""
with self.stats_lock:
stats_snapshot = self.stats.copy()
if stats_snapshot['old_success'] > 0:
stats_snapshot['avg_latency_old_ms'] = (
stats_snapshot['latency_old_ms'] / stats_snapshot['old_success']
)
if stats_snapshot['new_success'] > 0:
stats_snapshot['avg_latency_new_ms'] = (
stats_snapshot['latency_new_ms'] / stats_snapshot['new_success']
)
return stats_snapshot
# 数据迁移主脚本
def migrate_historical_data(start_date: str, batch_size: int = 50000):
"""迁移历史数据"""
# 连接配置
old_dsn = "postgresql://admin:old_pass@old-host:5432/ml_platform"
new_dsn = "postgresql://admin:new_pass@new-host:5432/ml_platform"
conn_old = psycopg2.connect(old_dsn)
conn_new = psycopg2.connect(new_dsn)
# 使用服务端游标避免内存溢出
cursor_old = conn_old.cursor('migration_cursor')
# 查询需要迁移的数据范围
cursor_old.execute(f"""
SELECT sample_id, project_id, data_path, label_status,
feature_vector, quality_score, created_at, updated_at, metadata
FROM algorithm_samples_old
WHERE created_at >= '{start_date}'
ORDER BY created_at, sample_id
""")
total_migrated = 0
batch = []
while True:
rows = cursor_old.fetchmany(batch_size)
if not rows:
break
batch = rows
start_time = time.time()
# 批量插入新分区表
with conn_new.cursor() as cur:
execute_batch(cur, """
INSERT INTO algorithm_samples VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
""", batch)
conn_new.commit()
elapsed = time.time() - start_time
total_migrated += len(batch)
print(f"Migrated {total_migrated:,} rows, batch speed: {len(batch)/elapsed:.0f} rows/s")
# 每100批检查一次数据一致性
if total_migrated % (batch_size * 100) == 0:
verify_consistency(conn_old, conn_new, sample_ids=[r[0] for r in batch[:10]])
# 启动双写模式
if __name__ == '__main__':
dual_writer = AlgorithmSamplesDualWriter(
old_dsn="postgresql://admin:old_pass@old-host/ml_platform",
new_dsn="postgresql://admin:new_pass@new-host/ml_platform"
)
# 模拟样本写入
sample = {
'sample_id': 123456789,
'project_id': 'face_recognition_v2',
'data_path': '/data/faces/123456.jpg',
'label_status': 'pending',
'feature_vector': '{"embedding": [0.1, 0.2, ...]}',
'quality_score': 0.95,
'created_at': datetime.now(),
'updated_at': datetime.now(),
'metadata': '{"source": "camera_01"}'
}
dual_writer.write_sample(sample, async_mode=True)
# 打印统计
time.sleep(2)
stats = dual_writer.get_stats()
print(f"Stats: {stats}")
在分区表实施前,我们的主表索引膨胀率达到320%,索引扫描性能下降10倍。通过pgstattuple和pg_stat_user_indexes分析发现:
索引名称 | 原始大小 | 膨胀后大小 | 膨胀率 | 日新增死元组 |
|---|---|---|---|---|
| 12GB | 38GB | 317% | 280万个 |
| 8GB | 29GB | 362% | 190万个 |
| 6GB | 22GB | 366% | 150万个 |
索引维护成本已经占到数据写入总成本的40%,VACUUM索引耗时超过2小时。
-- 诊断索引膨胀的完整脚本
CREATE EXTENSION IF NOT EXISTS pgstattuple;
-- 检查表膨胀情况
SELECT
schemaname, tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size,
n_live_tup, n_dead_tup,
round(n_dead_tup * 100.0 / NULLIF(n_live_tup + n_dead_tup, 0), 2) as dead_tuple_ratio
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_dead_tup DESC;
-- 检查索引使用情况
SELECT
schemaname, tablename, indexname,
idx_scan, idx_tup_read, idx_tup_fetch,
pg_size_pretty(pg_relation_size(indexrelid)) as index_size
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC;
-- 检查索引物理结构健康度
SELECT
indexname,
avg_leaf_density,
leaf_fragmentation,
pg_size_pretty(index_size) as index_size
FROM pgstatindex('idx_sample_id');
-- 输出示例
┌─────────────┬────────────┬────────────┬────────────┬──────────┬────────────┐
│ schemaname │ tablename │ total_size │ n_live_tup │ n_dead_tup│ dead_tuple_ratio│
├─────────────┼────────────┼────────────┼────────────┼──────────┼────────────┤
│ public │ algorithm_samples │ 584 GB │ 385M │ 92M │ 19.3% │
└─────────────┴────────────┴────────────┴────────────┴──────────┴────────────┘根据查询模式的分析,我们为不同业务场景设计差异化索引策略:
查询场景 | 优化前索引 | 优化后索引 | 索引大小 | 查询提升 | 维护成本 |
|---|---|---|---|---|---|
主键点查 | ( | ( | 2.1GB | 15% | 降低30% |
项目状态过滤 | ( | ( | 180MB | 120倍 | 降低80% |
时间范围聚合 | ( | ( | 45MB | 3倍 | 降低90% |
标签搜索 | ( | ( | 850MB | 8倍 | 持平 |
质量分排序 | ( | ( | 1.2GB | 25倍 | 降低40% |
-- 场景1:样本标注工作台(只查pending状态)
-- 传统索引:所有状态都建,95%空间浪费
CREATE INDEX idx_pending_samples ON algorithm_samples
(project_id, created_at, quality_score DESC)
WHERE label_status = 'pending'
INCLUDE (data_path, metadata);
-- 场景2:只读分析查询(历史数据不常更新)
-- 使用BRIN索引,牺牲精度换取超小体积
CREATE INDEX idx_created_at_brin ON algorithm_samples
USING BRIN (created_at, project_id)
WITH (pages_per_range = 128);
-- 场景3:高并发主键查询
-- 使用INCLUDE减少回表
CREATE UNIQUE INDEX idx_sample_id_covering ON algorithm_samples
(sample_id, created_at, project_id)
INCLUDE (data_path, label_status, quality_score);
-- 场景4:标签数组搜索
-- GIN索引优化,但避免过度索引
CREATE INDEX idx_tags_gin ON algorithm_samples
USING GIN ((metadata->'tags'))
WHERE metadata->'tags' IS NOT NULL;
-- 场景5:质量分分级查询(常用阈值0.8, 0.9, 0.95)
-- 表达式索引预计算
CREATE INDEX idx_high_quality ON algorithm_samples
(project_id, quality_score)
WHERE quality_score >= 0.9;
CREATE INDEX idx_medium_quality ON algorithm_samples
(project_id, quality_score)
WHERE quality_score BETWEEN 0.7 AND 0.9;实现索引健康度监控与自动重建:
# maintenance/index_maintenance.py
import psycopg2
import schedule
import time
from datetime import datetime, timedelta
class IndexMaintenanceManager:
"""索引维护管理器"""
def __init__(self, db_config: dict):
self.conn = psycopg2.connect(**db_config)
self.cursor = self.conn.cursor()
# 索引健康度阈值
self.bloat_threshold = 50 # 膨胀率超过50%
self.density_threshold = 70 # 密度低于70%
self.dead_tuple_threshold = 1000000 # 死元组超过100万
def analyze_index_health(self):
"""分析所有索引健康度"""
self.cursor.execute("""
WITH index_stats AS (
SELECT
schemaname, tablename, indexname,
idx_scan, idx_tup_read, idx_tup_fetch,
pg_stat_get_numscans(indexrelid) as scans,
pg_stat_get_blocks_fetched(indexrelid) as blocks_fetched
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
),
bloat_stats AS (
SELECT
indexname,
avg_leaf_density,
leaf_fragmentation,
index_size
FROM pgstatindex('algorithm_samples_pkey')
)
SELECT
i.schemaname, i.tablename, i.indexname,
i.idx_scan,
pg_size_pretty(pg_relation_size(i.indexrelid)) as index_size,
b.avg_leaf_density,
b.leaf_fragmentation
FROM index_stats i
LEFT JOIN bloat_stats b ON i.indexname = b.indexname
ORDER BY pg_relation_size(i.indexrelid) DESC
""")
results = self.cursor.fetchall()
unhealthy_indexes = []
for row in results:
(schema, table, index, scans, size, density, fragmentation) = row
# 判断健康度
if density and density < self.density_threshold:
unhealthy_indexes.append({
'index': index,
'reason': f'Density too low: {density}%',
'size': size,
'priority': 'high'
})
if fragmentation and fragmentation > 30:
unhealthy_indexes.append({
'index': index,
'reason': f'Fragmentation high: {fragmentation}%',
'size': size,
'priority': 'medium'
})
# 低频访问大索引
if scans < 1000 and 'GB' in size:
unhealthy_indexes.append({
'index': index,
'reason': f'Low usage: {scans} scans',
'size': size,
'priority': 'low'
})
return unhealthy_indexes
def rebuild_index_concurrently(self, index_name: str):
"""并发重建索引(不锁表)"""
try:
# 检查索引是否存在
self.cursor.execute("""
SELECT indexname
FROM pg_indexes
WHERE indexname = %s AND schemaname = 'public'
""", (index_name,))
if not self.cursor.fetchone():
self.logger.warning(f"Index {index_name} not found")
return False
# 使用CONCURRENTLY选项避免锁表
start_time = time.time()
self.logger.info(f"Starting concurrent rebuild of {index_name}")
self.cursor.execute(f"REINDEX INDEX CONCURRENTLY {index_name}")
self.conn.commit()
elapsed = time.time() - start_time
self.logger.info(f"Rebuild completed in {elapsed:.2f}s: {index_name}")
return True
except Exception as e:
self.logger.error(f"Rebuild failed for {index_name}: {e}")
self.conn.rollback()
return False
def schedule_maintenance(self):
"""调度维护任务"""
def daily_maintenance():
"""每日凌晨执行轻量维护"""
self.logger.info("Starting daily index maintenance")
# 分析统计信息
self.cursor.execute("ANALYZE algorithm_samples")
self.conn.commit()
# 检查膨胀最严重的3个索引
unhealthy = self.analyze_index_health()
high_priority = [idx for idx in unhealthy if idx['priority'] == 'high'][:3]
for idx in high_priority:
self.logger.warning(f"High priority index issue: {idx}")
# 在低峰期重建
self.rebuild_index_concurrently(idx['index'])
def weekly_maintenance():
"""每周日凌晨执行深度维护"""
self.logger.info("Starting weekly deep maintenance")
# 重建所有BRIN索引(轻量级)
self.cursor.execute("""
SELECT indexname FROM pg_indexes
WHERE indexname LIKE 'idx_%_brin'
AND schemaname = 'public'
""")
brin_indexes = self.cursor.fetchall()
for (idx_name,) in brin_indexes:
self.rebuild_index_concurrently(idx_name)
# 调度任务
schedule.every().day.at("02:00").do(daily_maintenance)
schedule.every().sunday.at("03:00").do(weekly_maintenance)
while True:
schedule.run_pending()
time.sleep(60)
# 使用示例
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.INFO)
maintainer = IndexMaintenanceManager({
'host': 'localhost',
'database': 'ml_platform',
'user': 'admin',
'password': 'secure_pass'
})
# 手动触发一次健康检查
issues = maintainer.analyze_index_health()
print(f"Found {len(issues)} unhealthy indexes:")
for issue in issues[:5]: # 显示前5个
print(f" - {issue['index']}: {issue['reason']} ({issue['size']})")
# 启动定时维护
maintainer.schedule_maintenance()
在算法数据仓库中,不当的数据类型选择会导致存储空间浪费300%以上,查询性能下降50%。我们曾将JSONB字段用于存储固定结构的质量评分,导致每行额外开销16字节,全表增加6.1GB存储。
字段场景 | 错误类型 | 正确类型 | 空间节省/行 | 全表节省(3.8亿行) |
|---|---|---|---|---|
样本ID | VARCHAR(50) | BIGINT | 42字节 | 16GB |
质量分 | NUMERIC(5,4) | REAL | 12字节 | 4.6GB |
标签状态 | VARCHAR(20) | CHAR(12) | 8字节 | 3GB |
特征向量 | JSONB (784维) | FLOAT4[] | 2.8KB | 1.06TB |
元数据 | TEXT | JSONB (压缩) | 200字节 | 76GB |
时间戳 | TIMESTAMP(3) | TIMESTAMP | 4字节 | 1.5GB |
-- 优化前:使用TEXT存储样本ID
CREATE TABLE samples_bad (
sample_id VARCHAR(50) PRIMARY KEY, -- 平均长度22字节+开销
quality_score NUMERIC(5,4), -- 每值16字节
feature_embedding JSONB, -- 784维向量=11KB
created_at TIMESTAMP(3) -- 12字节
);
-- 优化后:使用紧凑类型
CREATE TABLE samples_optimized (
sample_id BIGSERIAL PRIMARY KEY, -- 8字节
quality_score REAL, -- 4字节,精度足够
feature_embedding FLOAT4[784], -- 3.1KB,定长数组
created_at TIMESTAMP, -- 8字节
label_status CHAR(12) -- 12字节定长
) USING pg_tde; -- 可选透明加密
-- 特征向量存储优化示例
-- 使用array而不是JSONB,空间节省70%
CREATE OR REPLACE FUNCTION store_sample_embedding(
sample_id BIGINT,
embedding FLOAT8[784]
) RETURNS VOID AS $$
DECLARE
compressed_embedding FLOAT4[784];
BEGIN
-- 降精度压缩(精度损失<0.1%)
compressed_embedding := ARRAY(
SELECT value::FLOAT4
FROM unnest(embedding) AS value
);
INSERT INTO samples_optimized (sample_id, feature_embedding)
VALUES (sample_id, compressed_embedding)
ON CONFLICT (sample_id) DO UPDATE SET
feature_embedding = EXCLUDED.feature_embedding;
END;
$$ LANGUAGE plpgsql;
-- 验证类型转换效果
CREATE TABLE type_comparison (
id SERIAL,
numeric_score NUMERIC(5,4),
real_score REAL,
float_array FLOAT4[10],
json_array JSONB
);
INSERT INTO type_comparison
SELECT
i,
random()::NUMERIC(5,4),
random()::REAL,
array_agg(random()::FLOAT4),
to_json(array_agg(random()::FLOAT8))
FROM generate_series(1, 10000) i
CROSS JOIN generate_series(1, 10) j
GROUP BY i;
-- 对比存储空间
SELECT
pg_size_pretty(pg_total_relation_size('type_comparison')) as table_size,
pg_size_pretty(pg_indexes_size('type_comparison')) as index_size;# models/sample.py
from sqlalchemy import TypeDecorator, Numeric
import numpy as np
import json
class CompressedEmbedding(TypeDecorator):
"""压缩特征向量类型"""
impl = JSONB
def process_bind_param(self, value, dialect):
"""存储前压缩"""
if value is None:
return None
# FLOAT8[784] -> FLOAT4[784] 降精度
if isinstance(value, np.ndarray):
if value.dtype == np.float64:
value = value.astype(np.float32)
# 二进制编码进一步压缩
return {
'dtype': 'float32',
'shape': value.shape,
'data': value.tobytes().hex()
}
return value
def process_result_value(self, value, dialect):
"""读取后解压"""
if value is None:
return None
if isinstance(value, dict) and 'data' in value:
# 从二进制恢复
data = bytes.fromhex(value['data'])
return np.frombuffer(data, dtype=np.float32).reshape(value['shape'])
return value
# 使用示例
from sqlalchemy import Column, BigInteger
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class AlgorithmSample(Base):
__tablename__ = 'samples_optimized'
sample_id = Column(BigInteger, primary_key=True)
feature_embedding = Column(CompressedEmbedding, nullable=True)
quality_score = Column(Numeric(5, 4))
def set_embedding(self, embedding_array):
"""设置特征向量(自动压缩)"""
if embedding_array.dtype == np.float64:
embedding_array = embedding_array.astype(np.float32)
self.feature_embedding = embedding_array
# 创建表时指定类型
from sqlalchemy import create_engine, Table, MetaData
engine = create_engine('postgresql://admin:pass@localhost/ml_platform')
metadata = MetaData()
samples_table = Table(
'samples_optimized',
metadata,
Column('sample_id', BigInteger, primary_key=True),
Column('feature_embedding', CompressedEmbedding),
Column('quality_score', Numeric(5, 4))
)
metadata.create_all(engine)
算法数据仓库的查询模式主要分为三类:样本抽样、质量分析、标注任务分配。我们通过pg_stat_statements捕获TOP 20慢查询,发现85%的性能问题源于统计信息不准和内存参数不当。
-- 慢查询分析TOP 5
SELECT
query,
calls,
total_exec_time / 1000 as total_sec,
mean_exec_time as avg_ms,
max_exec_time as max_ms,
rows / calls as avg_rows
FROM pg_stat_statements
WHERE query LIKE '%algorithm_samples%'
ORDER BY total_exec_time DESC
LIMIT 5;
-- 典型结果
┌────────────────────────────────┬────────┬──────────┬──────────┬──────────┬──────────┐
│ query │ calls │ total_sec│ avg_ms │ max_ms │ avg_rows │
├────────────────────────────────┼────────┼──────────┼──────────┼──────────┼──────────┤
│ SELECT * FROM algorithm_samples│ 125,432│ 45,823 │ 365 │ 8,234 │ 1 │
│ WHERE sample_id = $1 │ │ │ │ │ │
├────────────────────────────────┼────────┼──────────┼──────────┼──────────┼──────────┤
│ SELECT COUNT(*) FROM algorithm_│ 8,923 │ 12,456 │ 1,396 │ 4,567 │ 1 │
│ samples WHERE project_id = $1 │ │ │ │ │ │
├────────────────────────────────┼────────┼──────────┼──────────┼──────────┼──────────┤
│ SELECT * FROM algorithm_samples│ 45,678 │ 89,234 │ 1,952 │ 15,678 │ 234 │
│ WHERE created_at > $1 ORDER BY │ │ │ │ │ │
│ quality_score DESC LIMIT 100 │ │ │ │ │ │
└────────────────────────────────┴────────┴──────────┴──────────┴──────────┴──────────┘默认的统计信息收集无法满足倾斜数据的优化需求,我们通过自定义统计目标和多列统计实现精确估算:
-- 为倾斜列设置高统计目标
ALTER TABLE algorithm_samples
ALTER COLUMN label_status SET STATISTICS 1000;
ALTER TABLE algorithm_samples
ALTER COLUMN quality_score SET STATISTICS 500;
-- 创建多列统计信息(捕获列相关性)
CREATE STATISTICS st_project_status (dependencies)
ON project_id, label_status FROM algorithm_samples;
CREATE STATISTICS st_created_quality (dependencies)
ON created_at, quality_score FROM algorithm_samples;
CREATE STATISTICS st_ndistinct (ndistinct)
ON project_id, dataset_id, label_status FROM algorithm_samples;
-- 配置手动采样策略
-- 对于超大数据分区,使用自定义采样比例
ALTER TABLE algorithm_samples ALTER COLUMN feature_vector SET STATISTICS 0;
-- 创建扩展统计信息
CREATE STATISTICS st_dependencies (dependencies)
ON project_id, label_status FROM algorithm_samples;
-- 分析特定分区(不全局analyze)
ANALYZE algorithm_samples_2024q1d01;
-- 验证统计信息准确性
SELECT
schemaname, tablename, attname,
n_distinct, correlation,
most_common_vals, most_common_freqs
FROM pg_stats
WHERE tablename LIKE 'algorithm_samples%'
AND attname IN ('project_id', 'label_status')
ORDER BY schemaname, tablename, attname;根据不同查询类型动态调整内存参数:
# db/query_optimizer.py
from sqlalchemy import create_engine, event
from sqlalchemy.orm import Session
import math
class QueryOptimizer:
"""查询优化器,动态调整执行参数"""
def __init__(self, engine):
self.engine = engine
self.setup_event_listeners()
# 根据数据量计算参数
self.total_samples = self.get_table_count()
self.optimal_work_mem = self.calculate_optimal_work_mem()
self.optimal_maintenance_work_mem = self.calculate_maintenance_mem()
def setup_event_listeners(self):
"""设置SQLAlchemy事件监听"""
@event.listens_for(self.engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
"""在查询前设置会话级参数"""
query_type = self.classify_query(statement)
# 根据查询类型设置参数
if query_type == 'aggregation':
# 大聚合查询分配更多内存
cursor.execute("SET LOCAL work_mem = '256MB'")
cursor.execute("SET LOCAL hash_mem_multiplier = 2.0")
elif query_type == 'sort':
# 排序查询
cursor.execute("SET LOCAL work_mem = '128MB'")
elif query_type == 'index_scan':
# 索引扫描,使用默认内存
cursor.execute("SET LOCAL work_mem = '16MB'")
elif query_type == 'sequential_scan':
# 全表扫描,限制内存避免抖动
cursor.execute("SET LOCAL work_mem = '4MB'")
def classify_query(self, sql: str) -> str:
"""简单查询分类"""
sql_upper = sql.upper().strip()
if 'GROUP BY' in sql_upper or 'COUNT(' in sql_upper:
return 'aggregation'
elif 'ORDER BY' in sql_upper and 'LIMIT' in sql_upper:
return 'sort'
elif 'WHERE sample_id' in sql_upper:
return 'index_scan'
else:
return 'sequential_scan'
def calculate_optimal_work_mem(self) -> str:
"""基于数据量计算work_mem"""
# 公式:work_mem = 总内存 / (并发连接数 * 3)
total_memory_gb = 128 # 服务器内存
max_connections = 200
work_mem_mb = (total_memory_gb * 1024) / (max_connections * 3)
work_mem_mb = min(work_mem_mb, 512) # 上限512MB
return f"{int(work_mem_mb)}MB"
def calculate_maintenance_mem(self) -> str:
"""计算维护内存"""
# 维护内存可以较大
return "4GB"
def apply_optimal_settings(self):
"""应用计算出的最优参数"""
with self.engine.connect() as conn:
# 设置全局参数
conn.execute(f"ALTER SYSTEM SET work_mem = '{self.optimal_work_mem}'")
conn.execute(f"ALTER SYSTEM SET maintenance_work_mem = '{self.optimal_maintenance_work_mem}'")
conn.execute(f"ALTER SYSTEM SET effective_cache_size = '80GB'")
conn.execute(f"ALTER SYSTEM SET random_page_cost = 1.1") # SSD优化
# 重载配置
conn.execute("SELECT pg_reload_conf()")
self.logger.info(f"Applied work_mem: {self.optimal_work_mem}")
def get_table_count(self) -> int:
"""获取表总行数"""
with self.engine.connect() as conn:
result = conn.execute("SELECT COUNT(*) FROM algorithm_samples")
return result.scalar()
# 使用示例
engine = create_engine('postgresql://admin:pass@localhost/ml_platform')
optimizer = QueryOptimizer(engine)
# 自动应用优化参数
optimizer.apply_optimal_settings()
# 执行查询,参数自动生效
with Session(engine) as session:
result = session.execute("""
SELECT project_id, COUNT(*), AVG(quality_score)
FROM algorithm_samples
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY project_id
""")
# 由于查询包含GROUP BY,work_mem自动设置为256MB
print(result.fetchall())# queries/optimized_queries.py
from db.query_optimizer import QueryOptimizer
class SampleRepository:
"""样本仓库,提供优化后的查询方法"""
def __init__(self, engine):
self.engine = engine
self.optimizer = QueryOptimizer(engine)
def get_high_quality_samples(self, project_id: str, limit: int = 100):
"""获取高质量样本(优化排序)"""
# 优化前:SELECT * FROM algorithm_samples WHERE project_id=$1 ORDER BY quality_score DESC LIMIT $2
# 问题:无索引导致全表扫描+内存排序
# 优化后:使用覆盖索引+游标分页
sql = """
SELECT sample_id, data_path, quality_score, metadata
FROM algorithm_samples
WHERE project_id = %(project_id)s
AND quality_score > 0.85
ORDER BY quality_score DESC, sample_id
LIMIT %(limit)s
"""
# 参数化查询防止SQL注入
with self.engine.connect() as conn:
result = conn.execute(sql, {
'project_id': project_id,
'limit': limit
})
return [dict(row) for row in result]
def get_sample_batch(self, sample_ids: list):
"""批量查询样本(优化IN查询)"""
# 优化前:在Python中循环单条查询
# 优化后:单次查询+UNNEST
sql = """
WITH requested_ids AS (
SELECT UNNEST(%(sample_ids)s::BIGINT[]) as sample_id
)
SELECT s.*
FROM requested_ids r
JOIN algorithm_samples s ON r.sample_id = s.sample_id
WHERE s.created_at >= CURRENT_DATE - INTERVAL '30 days'
"""
with self.engine.connect() as conn:
result = conn.execute(sql, {'sample_ids': sample_ids})
return {row['sample_id']: dict(row) for row in result}
def get_daily_stats(self, project_id: str, days: int = 7):
"""获取每日统计(优化聚合)"""
# 使用增量聚合避免实时计算
sql = """
SELECT
DATE(created_at) as day,
COUNT(*) as total_samples,
COUNT(*) FILTER (WHERE label_status = 'completed') as completed_count,
AVG(quality_score) as avg_quality,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY quality_score) as median_quality
FROM algorithm_samples
WHERE project_id = %(project_id)s
AND created_at >= CURRENT_DATE - INTERVAL '%(days)s days'
GROUP BY DATE(created_at)
ORDER BY day
"""
with self.engine.connect() as conn:
result = conn.execute(sql, {
'project_id': project_id,
'days': days
})
return [dict(row) for row in result]
# 缓存层集成
from functools import lru_cache
import redis
import json
class CachedSampleRepository(SampleRepository):
"""带缓存的样本仓库"""
def __init__(self, engine, redis_client):
super().__init__(engine)
self.redis = redis_client
@lru_cache(maxsize=1024)
def get_project_stats(self, project_id: str):
"""查询结果缓存1小时"""
cache_key = f"stats:{project_id}"
# 先查缓存
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 缓存未命中,查询数据库
result = super().get_daily_stats(project_id, days=30)
# 写入缓存(1小时过期)
self.redis.setex(cache_key, 3600, json.dumps(result))
return result
附录:一键部署脚本
#!/bin/bash
# deploy_ml_data_warehouse.sh
set -e
PG_VERSION=${1:-14}
DB_NAME=${2:-ml_platform}
BACKUP_S3_BUCKET=${3:-ml-platform-backups}
echo "=== 部署算法数据仓库 (PostgreSQL $PG_VERSION) ==="
# 1. 安装PostgreSQL
sudo apt-get update
sudo apt-get install -y postgresql-$PG_VERSION postgresql-contrib-$PG_VERSION
# 2. 安装扩展
sudo apt-get install -y postgresql-$PG_VERSION-pgstattuple postgresql-$PG_VERSION-pgBackRest
# 3. 初始化数据库
sudo -u postgres createdb $DB_NAME
sudo -u postgres psql $DB_NAME -c "CREATE EXTENSION IF NOT EXISTS pgstattuple;"
# 4. 应用配置
sudo cp config/postgresql.conf /etc/postgresql/$PG_VERSION/main/
sudo cp config/pgbackrest.conf /etc/pgbackrest/
# 5. 创建分区表
python3 scripts/create_partitions.py --db-name $DB_NAME --days 30
# 6. 创建索引
psql $DB_NAME -f scripts/create_indexes.sql
# 7. 设置备份
sudo systemctl enable postgresql
sudo systemctl start postgresql
# 8. 初始化备份仓库
sudo pgbackrest --stanza=$DB_NAME stanza-create
sudo pgbackrest --stanza=$DB_NAME backup
# 9. 部署监控
kubectl apply -f k8s/pg-exporter.yaml
kubectl apply -f k8s/backup-cronjob.yaml
echo "部署完成!访问 http://grafana.local 查看监控"
# 一键恢复脚本
cat > restore.sh <<EOF
#!/bin/bash
# 从最新备份恢复
PG_VERSION=$PG_VERSION
DB_NAME=$DB_NAME
sudo systemctl stop postgresql
sudo rm -rf /var/lib/postgresql/$PG_VERSION/main/*
sudo pgbackrest --stanza=$DB_NAME restore
sudo systemctl start postgresql
echo "恢复完成,数据库已恢复到最新备份点"
EOF
chmod +x restore.sh原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。