
模型训练好了,怎么部署上线?这是90%的AI项目卡壳的地方。
本文分享我过去一年的AI部署实践经验,涵盖三种主流方案,附带完整代码。
方案 | 适用场景 | 成本 | 难度 | 延迟 |
|---|---|---|---|---|
云端API | 通用场景,快速上线 | 按量付费 | 低 | 100-500ms |
本地服务器 | 数据敏感,高频调用 | 固定成本 | 中 | 10-100ms |
边缘部署 | 实时要求,离线环境 | 硬件成本 | 高 | <10ms |
选型建议:
使用FastAPI封装模型,Docker打包,部署到云服务器。
ai-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI入口
│ ├── model.py # 模型加载和推理
│ └── config.py # 配置文件
├── models/ # 存放模型文件
│ └── model.onnx
├── Dockerfile
├── requirements.txt
└── docker-compose.ymlapp/model.py - 模型封装
import onnxruntime as ort
import numpy as np
from typing import List, Dict
class Predictor:
def __init__(self, model_path: str):
# ONNX Runtime加载模型
self.session = ort.InferenceSession(model_path)
self.input_name = self.session.get_inputs()[0].name
def predict(self, texts: List[str]) -> List[Dict]:
# 预处理
inputs = self._preprocess(texts)
# 推理
outputs = self.session.run(None, {self.input_name: inputs})
# 后处理
results = self._postprocess(outputs[0])
return results
def _preprocess(self, texts: List[str]) -> np.ndarray:
# 根据模型需求实现
pass
def _postprocess(self, outputs: np.ndarray) -> List[Dict]:
# 解析结果
pass
# 全局单例
predictor = Predictor("models/model.onnx")app/main.py - API服务
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
from .model import predictor
app = FastAPI(title="AI模型服务", version="1.0")
class PredictRequest(BaseModel):
texts: List[str]
class PredictResponse(BaseModel):
results: List[dict]
latency_ms: float
@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
import time
start = time.time()
try:
results = predictor.predict(request.texts)
latency = (time.time() - start) * 1000
return {
"results": results,
"latency_ms": round(latency, 2)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "ok", "model_loaded": predictor is not None}Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码和模型
COPY app/ ./app/
COPY models/ ./models/
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]docker-compose.yml
version: '3.8'
services:
ai-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/model.onnx
- WORKERS=4
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
restart: unless-stopped # 构建镜像
docker build -t ai-api:latest .
# 本地测试
docker run -p 8000:8000 ai-api:latest
# 生产部署
docker-compose up -d import requests
# 调用API
response = requests.post(
"http://localhost:8000/predict",
json={"texts": ["这是一条测试文本"]}
)
result = response.json()
print(f"推理结果: {result['results']}")
print(f"延迟: {result['latency_ms']}ms")对于高并发场景,需要GPU加速和批处理优化。
import asyncio
from concurrent.futures import ThreadPoolExecutor
import torch
class GPUInferencer:
def __init__(self, model_path: str, batch_size: int = 32):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = torch.load(model_path, map_location=self.device)
self.model.eval()
self.batch_size = batch_size
self.executor = ThreadPoolExecutor(max_workers=4)
async def predict_batch(self, texts: List[str]) -> List[Dict]:
# 分批处理
batches = [texts[i:i+self.batch_size]
for i in range(0, len(texts), self.batch_size)]
results = []
for batch in batches:
# 异步推理避免阻塞
result = await asyncio.get_event_loop().run_in_executor(
self.executor, self._infer, batch
)
results.extend(result)
return results
def _infer(self, batch: List[str]) -> List[Dict]:
with torch.no_grad():
# 转移到GPU
inputs = self._preprocess(batch).to(self.device)
outputs = self.model(inputs)
return self._postprocess(outputs) [负载均衡 Nginx]
↓
[API服务 x3] ← 负载均衡
↓
[Redis 队列]
↓
[GPU推理服务] ← 单卡/多卡docker-compose.gpu.yml
version: '3.8'
services:
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- api
api:
build: .
deploy:
replicas: 3
environment:
- REDIS_URL=redis://redis:6379
- GPU_SERVICE_URL=http://gpu-inference:8001
gpu-inference:
build:
context: .
dockerfile: Dockerfile.gpu
runtime: nvidia
environment:
- NVIDIA_VISIBLE_DEVICES=0,1
- BATCH_SIZE=64
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 2
capabilities: [gpu]
redis:
image: redis:alpine将模型转换为ONNX格式,部署到边缘设备。
import torch
import torch.onnx
# PyTorch模型转ONNX
def export_onnx(model, dummy_input, output_path):
torch.onnx.export(
model,
dummy_input,
output_path,
input_names=["input"],
output_names=["output"],
dynamic_axes={
"input": {0: "batch_size"},
"output": {0: "batch_size"}
},
opset_version=11
)
print(f"ONNX模型已保存: {output_path}")
# 使用
export_onnx(model, dummy_input, "model.onnx") import onnxruntime as ort
class EdgeInference:
def __init__(self, model_path: str):
# 根据设备选择执行提供程序
if ort.get_device() == "GPU":
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
else:
providers = ["CPUExecutionProvider"]
# 开启图优化
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = \
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(
model_path,
sess_options,
providers=providers
)
def infer(self, input_data: np.ndarray) -> np.ndarray:
input_name = self.session.get_inputs()[0].name
return self.session.run(None, {input_name: input_data})[0] from onnxruntime.quantization import quantize_dynamic, QuantType
# INT8量化
quantize_dynamic(
model_input="model.onnx",
model_output="model_int8.onnx",
weight_type=QuantType.QInt8
)
# 体积通常减小到原来的1/4,推理速度提升2-3倍 # Prometheus指标
from prometheus_client import Counter, Histogram, Gauge
request_count = Counter('ai_requests_total', '总请求数')
request_latency = Histogram('ai_request_duration_seconds', '请求延迟')
model_loaded = Gauge('ai_model_loaded', '模型是否加载') @app.get("/ready")
async def readiness():
"""K8s就绪检查"""
if predictor is None:
raise HTTPException(status_code=503)
return {"ready": True}
@app.get("/live")
async def liveness():
"""K8s存活检查"""
return {"alive": True} from fastapi.security import HTTPBearer
from slowapi import Limiter
# 限流
limiter = Limiter(key_func=lambda: "global")
# 认证
security = HTTPBearer()
@app.post("/predict")
@limiter.limit("100/minute") # 每分钟100次
async def predict(request: PredictRequest, token: str = Depends(security)):
# 验证token
pass方案:预加载 + 延迟加载
@app.on_event("startup")
async def load_model():
global predictor
predictor = Predictor() # 服务启动时加载方案:模型分片 / 动态批处理 / 使用ONNX
# 动态调整batch size
max_batch = 64
while batch_size > 1:
try:
result = model.predict(batch)
break
except RuntimeError: # OOM
batch_size //= 2方案:异步 + 队列 + 缓存
# 使用Redis缓存结果
def get_cache_key(text: str) -> str:
import hashlib
return f"ai:{hashlib.md5(text.encode()).hexdigest()}"
# 缓存命中直接返回
result = redis.get(cache_key)
if result:
return json.loads(result) 1. 模型导出 → ONNX/TorchScript
2. 服务封装 → FastAPI/Flask
3. 容器化 → Docker/Docker Compose
4. 云端部署 → AWS/GCP/阿里云
5. 监控告警 → Prometheus + Grafana
6. 持续集成 → GitHub Actions/GitLab CI完整代码已开源:github.com/yourname/ai-deployment-template
有部署问题欢迎评论区留言。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。