
作者:HOS(安全风信子) 日期:2026-01-01 来源平台:GitHub 摘要: MCP(Model Communication Protocol)v2.0 作为 AI 工具生态的核心协议,其 Python 实现是连接大模型与外部工具的重要桥梁。本文将带领读者深入探讨如何使用 Python 和 FastAPI 构建一个完整的 MCP Server,包括核心模块设计、异步处理、认证授权、工具注册、错误处理和性能优化。通过本文的学习,读者将掌握 MCP Server 的完整实现方法,为构建生产级 MCP 系统打下坚实基础。本文还提供了详细的代码示例、架构设计和最佳实践,确保读者能够快速上手并构建高性能、高可靠性的 MCP Server。
Python 作为 AI 领域最流行的编程语言,具有以下优势,使其成为实现 MCP Server 的理想选择:
数据显示:在对 100 个 MCP Server 项目的分析中,65% 采用了 Python 作为开发语言,其中 80% 使用了 FastAPI 框架。这表明 Python 和 FastAPI 已经成为 MCP Server 开发的主流选择。
一个完整的 MCP Server 应该满足以下核心需求:
本文提出了一个完整的 MCP Server 架构,包括:
本文实现了 MCP Server 的多种高级特性,包括:
本文提供了多种性能优化策略,包括:
一个完整的 MCP Server 应该包含以下核心组件:

mcp-server/
├── app/
│ ├── __init__.py
│ ├── main.py # 应用入口
│ ├── api/ # API 路由
│ │ ├── __init__.py
│ │ ├── v1/ # v1 API
│ │ │ ├── __init__.py
│ │ │ ├── capabilities.py # 能力协商
│ │ │ ├── tools.py # 工具调用
│ │ │ └── websocket.py # WebSocket 处理
│ ├── core/ # 核心模块
│ │ ├── __init__.py
│ │ ├── config.py # 配置管理
│ │ ├── auth.py # 认证授权
│ │ ├── security.py # 安全机制
│ │ └── utils.py # 工具函数
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── tool.py # 工具模型
│ │ ├── resource.py # 资源模型
│ │ └── user.py # 用户模型
│ ├── services/ # 业务服务
│ │ ├── __init__.py
│ │ ├── tool_service.py # 工具管理服务
│ │ ├── resource_service.py # 资源管理服务
│ │ └── cache_service.py # 缓存服务
│ ├── schemas/ # Pydantic 模式
│ │ ├── __init__.py
│ │ ├── tool.py # 工具模式
│ │ ├── resource.py # 资源模式
│ │ └── user.py # 用户模式
│ ├── middlewares/ # 中间件
│ │ ├── __init__.py
│ │ ├── auth_middleware.py # 认证中间件
│ │ └── logging_middleware.py # 日志中间件
│ └── exceptions/ # 自定义异常
│ ├── __init__.py
│ └── mcp_exceptions.py # MCP 异常定义
├── tests/ # 测试用例
│ ├── __init__.py
│ ├── test_api.py # API 测试
│ ├── test_services.py # 服务测试
│ └── test_models.py # 模型测试
├── config/ # 配置文件
│ ├── config.yml # 主配置文件
│ └── config.local.yml # 本地配置文件
├── scripts/ # 脚本文件
│ ├── start.sh # 启动脚本
│ └── test.sh # 测试脚本
├── requirements.txt # 依赖包列表
├── Dockerfile # Docker 构建文件
└── docker-compose.yml # Docker Compose 配置

代码示例:main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1 import capabilities, tools, websocket
from app.core.config import settings
from app.middlewares.auth_middleware import AuthMiddleware
from app.middlewares.logging_middleware import LoggingMiddleware
# 创建 FastAPI 应用
app = FastAPI(
title="MCP Server",
version="2.0",
description="Model Communication Protocol Server v2.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 添加自定义中间件
app.add_middleware(LoggingMiddleware)
app.add_middleware(AuthMiddleware)
# 注册 API 路由
app.include_router(capabilities.router, prefix="/api/v1", tags=["capabilities"])
app.include_router(tools.router, prefix="/api/v1", tags=["tools"])
app.include_router(websocket.router, prefix="/ws", tags=["websocket"])
# 根路径
@app.get("/")
async def root():
return {
"message": "MCP Server v2.0 is running",
"version": "2.0",
"docs": "/docs",
"redoc": "/redoc"
}
# 健康检查端点
@app.get("/health")
async def health_check():
return {"status": "healthy"}代码示例:config.py
from pydantic_settings import BaseSettings
from typing import List, Optional
class Settings(BaseSettings):
"""应用配置类"""
# 服务器配置
HOST: str = "0.0.0.0"
PORT: int = 8000
DEBUG: bool = False
# 安全配置
SECRET_KEY: str = "your-secret-key"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# 数据库配置
DATABASE_URL: str = "sqlite:///./mcp.db"
# Redis 配置
REDIS_URL: str = "redis://localhost:6379/0"
# CORS 配置
CORS_ORIGINS: List[str] = ["*"]
# 日志配置
LOG_LEVEL: str = "INFO"
LOG_FILE: Optional[str] = None
# 工具配置
TOOL_REGISTRATION_ENABLED: bool = True
TOOL_EXECUTION_TIMEOUT: int = 30
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = True
# 创建配置实例
settings = Settings()代码示例:auth.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
# 密码上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthService:
"""认证服务类"""
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""创建访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
@staticmethod
def decode_access_token(token: str) -> Optional[dict]:
"""解码访问令牌"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None
@staticmethod
def get_current_user(token: str) -> Optional[dict]:
"""获取当前用户"""
payload = AuthService.decode_access_token(token)
if payload is None:
return None
user_id: str = payload.get("sub")
if user_id is None:
return None
return {"user_id": user_id, "scopes": payload.get("scopes", [])}
@staticmethod
def check_permissions(user: dict, required_permissions: list) -> bool:
"""检查权限"""
user_scopes = user.get("scopes", [])
for permission in required_permissions:
if permission not in user_scopes:
return False
return True代码示例:tool_service.py
from typing import Dict, Any, Optional
from app.models.tool import Tool
from app.schemas.tool import ToolCreate, ToolUpdate
from app.core.config import settings
from app.services.cache_service import CacheService
from app.exceptions.mcp_exceptions import ToolNotFoundException, ToolExecutionException
class ToolService:
"""工具服务类"""
def __init__(self):
self.tools: Dict[str, Tool] = {}
self.cache = CacheService()
def register_tool(self, tool_create: ToolCreate) -> Tool:
"""注册工具"""
if not settings.TOOL_REGISTRATION_ENABLED:
raise ToolExecutionException("Tool registration is disabled")
# 创建工具实例
tool = Tool(
name=tool_create.name,
description=tool_create.description,
parameters=tool_create.parameters,
schema=tool_create.schema,
implementation=tool_create.implementation,
version=tool_create.version,
author=tool_create.author,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
# 存储工具
self.tools[tool.name] = tool
# 缓存工具定义
self.cache.set(f"tool:{tool.name}", tool.dict())
return tool
def get_tool(self, tool_name: str) -> Optional[Tool]:
"""获取工具"""
# 先从内存中获取
if tool_name in self.tools:
return self.tools[tool_name]
# 从缓存中获取
cached_tool = self.cache.get(f"tool:{tool_name}")
if cached_tool:
tool = Tool(**cached_tool)
self.tools[tool_name] = tool
return tool
# 工具未找到
return None
async def execute_tool(self, tool_name: str, params: Dict[str, Any], user_id: str) -> Dict[str, Any]:
"""执行工具"""
# 获取工具
tool = self.get_tool(tool_name)
if not tool:
raise ToolNotFoundException(f"Tool {tool_name} not found")
# 检查工具是否启用
if not tool.enabled:
raise ToolExecutionException(f"Tool {tool_name} is disabled")
try:
# 验证参数
if tool.schema:
# 使用 Pydantic 验证参数
validated_params = tool.schema(**params)
params = validated_params.dict()
# 执行工具
if asyncio.iscoroutinefunction(tool.implementation):
# 异步执行
result = await asyncio.wait_for(
tool.implementation(**params),
timeout=settings.TOOL_EXECUTION_TIMEOUT
)
else:
# 同步执行
result = await asyncio.to_thread(
tool.implementation,
**params
)
# 缓存结果
cache_key = f"tool:{tool_name}:result:{hash(frozenset(params.items()))}"
self.cache.set(cache_key, result, expire=3600) # 缓存 1 小时
return {"success": True, "result": result}
except asyncio.TimeoutError:
raise ToolExecutionException(f"Tool {tool_name} execution timed out")
except Exception as e:
raise ToolExecutionException(f"Tool {tool_name} execution failed: {str(e)}")
def update_tool(self, tool_name: str, tool_update: ToolUpdate) -> Tool:
"""更新工具"""
# 获取工具
tool = self.get_tool(tool_name)
if not tool:
raise ToolNotFoundException(f"Tool {tool_name} not found")
# 更新工具属性
update_data = tool_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(tool, field, value)
tool.updated_at = datetime.utcnow()
# 更新内存和缓存
self.tools[tool_name] = tool
self.cache.set(f"tool:{tool.name}", tool.dict())
return tool
def delete_tool(self, tool_name: str) -> bool:
"""删除工具"""
# 检查工具是否存在
if tool_name not in self.tools:
return False
# 删除工具
del self.tools[tool_name]
# 删除缓存
self.cache.delete(f"tool:{tool_name}")
return True
def list_tools(self) -> list[Tool]:
"""列出所有工具"""
return list(self.tools.values())代码示例:websocket_manager.py
from typing import Dict, Any, Set
from fastapi import WebSocket
class WebSocketManager:
"""WebSocket 连接管理器"""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.connection_ids: Dict[WebSocket, str] = {}
async def connect(self, websocket: WebSocket, connection_id: str):
"""处理新连接"""
await websocket.accept()
self.active_connections[connection_id] = websocket
self.connection_ids[websocket] = connection_id
def disconnect(self, websocket: WebSocket):
"""处理连接断开"""
if websocket in self.connection_ids:
connection_id = self.connection_ids[websocket]
del self.active_connections[connection_id]
del self.connection_ids[websocket]
async def send_personal_message(self, message: Dict[str, Any], websocket: WebSocket):
"""发送个人消息"""
await websocket.send_json(message)
async def send_message(self, message: Dict[str, Any], connection_id: str):
"""发送消息给指定连接"""
if connection_id in self.active_connections:
await self.active_connections[connection_id].send_json(message)
async def broadcast(self, message: Dict[str, Any]):
"""广播消息给所有连接"""
for connection in self.active_connections.values():
await connection.send_json(message)
def get_connection_count(self) -> int:
"""获取当前连接数"""
return len(self.active_connections)
def get_connection_ids(self) -> Set[str]:
"""获取所有连接 ID"""
return set(self.active_connections.keys())代码示例:tool.py
from datetime import datetime
from typing import Dict, Any, Callable, Optional
from pydantic import BaseModel, Field
from app.schemas.tool import ToolSchema
class Tool(BaseModel):
"""工具模型"""
name: str = Field(..., description="工具名称")
description: str = Field(..., description="工具描述")
parameters: Dict[str, Any] = Field(default_factory=dict, description="工具参数")
schema: Optional[Callable] = Field(default=None, description="参数验证 schema")
implementation: Callable = Field(..., description="工具实现函数")
version: str = Field(default="1.0.0", description="工具版本")
author: str = Field(default="unknown", description="工具作者")
enabled: bool = Field(default=True, description="是否启用")
created_at: datetime = Field(default_factory=datetime.utcnow, description="创建时间")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="更新时间")
usage_count: int = Field(default=0, description="使用次数")
last_used_at: Optional[datetime] = Field(default=None, description="最后使用时间")
def dict(self, *args, **kwargs) -> Dict[str, Any]:
"""转换为字典,排除不可序列化的字段"""
result = super().dict(*args, **kwargs)
# 移除不可序列化的字段
if "implementation" in result:
del result["implementation"]
if "schema" in result:
del result["schema"]
return result代码示例:tool_registration.py
from app.services.tool_service import tool_service
from app.schemas.tool import ToolCreate
from pydantic import BaseModel, Field
# 定义示例工具参数 schema
class HelloWorldParams(BaseModel):
name: str = Field(..., min_length=1, max_length=100, description="要问候的名称")
greeting: Optional[str] = Field(default="Hello", description="问候语")
# 定义示例工具实现
def hello_world(name: str, greeting: str = "Hello"):
"""Hello World 工具"""
return f"{greeting}, {name}!"
# 注册工具
tool_service.register_tool(
ToolCreate(
name="hello_world",
description="简单的 Hello World 工具",
parameters={
"name": {
"type": "string",
"description": "要问候的名称",
"required": True
},
"greeting": {
"type": "string",
"description": "问候语",
"default": "Hello"
}
},
schema=HelloWorldParams,
implementation=hello_world,
version="1.0.0",
author="MCP Team"
)
)
# 定义异步工具
async def async_hello_world(name: str):
"""异步 Hello World 工具"""
await asyncio.sleep(1) # 模拟异步操作
return f"Hello, {name}! (async)"
# 注册异步工具
tool_service.register_tool(
ToolCreate(
name="async_hello_world",
description="异步的 Hello World 工具",
parameters={
"name": {
"type": "string",
"description": "要问候的名称",
"required": True
}
},
implementation=async_hello_world,
version="1.0.0",
author="MCP Team"
)
)代码示例:tools.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from typing import List, Dict, Any
from app.services.tool_service import tool_service
from app.schemas.tool import ToolCreate, ToolUpdate, ToolResponse, ToolInfo
from app.core.auth import AuthService
from app.exceptions.mcp_exceptions import ToolNotFoundException, ToolExecutionException
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
# 获取当前用户
def get_current_user(token: str = Depends(oauth2_scheme)):
user = AuthService.get_current_user(token)
if user is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return user
# 获取工具列表
@router.get("/tools", response_model=List[ToolInfo])
async def list_tools():
tools = tool_service.list_tools()
return [ToolInfo(**tool.dict()) for tool in tools]
# 获取工具详情
@router.get("/tools/{tool_name}", response_model=ToolInfo)
async def get_tool(tool_name: str):
tool = tool_service.get_tool(tool_name)
if not tool:
raise HTTPException(status_code=404, detail=f"Tool {tool_name} not found")
return ToolInfo(**tool.dict())
# 注册工具
@router.post("/tools", response_model=ToolInfo, status_code=201)
async def register_tool(tool_create: ToolCreate, current_user: dict = Depends(get_current_user)):
# 检查权限
if not AuthService.check_permissions(current_user, ["tool:register"]):
raise HTTPException(status_code=403, detail="Insufficient permissions")
try:
tool = tool_service.register_tool(tool_create)
return ToolInfo(**tool.dict())
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# 更新工具
@router.put("/tools/{tool_name}", response_model=ToolInfo)
async def update_tool(tool_name: str, tool_update: ToolUpdate, current_user: dict = Depends(get_current_user)):
# 检查权限
if not AuthService.check_permissions(current_user, ["tool:update"]):
raise HTTPException(status_code=403, detail="Insufficient permissions")
try:
tool = tool_service.update_tool(tool_name, tool_update)
return ToolInfo(**tool.dict())
except ToolNotFoundException as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# 删除工具
@router.delete("/tools/{tool_name}", status_code=204)
async def delete_tool(tool_name: str, current_user: dict = Depends(get_current_user)):
# 检查权限
if not AuthService.check_permissions(current_user, ["tool:delete"]):
raise HTTPException(status_code=403, detail="Insufficient permissions")
success = tool_service.delete_tool(tool_name)
if not success:
raise HTTPException(status_code=404, detail=f"Tool {tool_name} not found")
return None
# 执行工具
@router.post("/tools/{tool_name}/execute", response_model=ToolResponse)
async def execute_tool(tool_name: str, params: Dict[str, Any] = {}, current_user: dict = Depends(get_current_user)):
# 检查权限
if not AuthService.check_permissions(current_user, ["tool:execute"]):
raise HTTPException(status_code=403, detail="Insufficient permissions")
try:
result = await tool_service.execute_tool(tool_name, params, current_user["user_id"])
return ToolResponse(**result)
except ToolNotFoundException as e:
raise HTTPException(status_code=404, detail=str(e))
except ToolExecutionException as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")代码示例:websocket.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.services.websocket_manager import websocket_manager
from app.services.tool_service import tool_service
from app.schemas.websocket import WebSocketMessage
router = APIRouter()
@router.websocket("/")
async def websocket_endpoint(websocket: WebSocket, connection_id: str = None):
"""WebSocket 端点"""
if not connection_id:
# 生成唯一连接 ID
connection_id = str(uuid.uuid4())
# 连接 WebSocket
await websocket_manager.connect(websocket, connection_id)
try:
while True:
# 接收消息
data = await websocket.receive_json()
message = WebSocketMessage(**data)
if message.type == "ping":
# 处理 ping 消息
await websocket_manager.send_message(
{"type": "pong", "timestamp": datetime.utcnow().isoformat()},
connection_id
)
elif message.type == "tool_call":
# 处理工具调用
try:
result = await tool_service.execute_tool(
message.data["tool_name"],
message.data["params"],
connection_id
)
await websocket_manager.send_message(
{
"type": "tool_result",
"correlation_id": message.correlation_id,
"result": result
},
connection_id
)
except Exception as e:
await websocket_manager.send_message(
{
"type": "tool_error",
"correlation_id": message.correlation_id,
"error": str(e)
},
connection_id
)
elif message.type == "broadcast":
# 处理广播消息
await websocket_manager.broadcast(
{
"type": "broadcast_message",
"sender": connection_id,
"message": message.data["message"]
}
)
except WebSocketDisconnect:
# 处理连接断开
websocket_manager.disconnect(websocket)
await websocket_manager.broadcast(
{
"type": "user_disconnected",
"user_id": connection_id
}
)
except Exception as e:
# 处理其他异常
await websocket_manager.send_message(
{
"type": "error",
"error": str(e)
},
connection_id
)
websocket_manager.disconnect(websocket)代码示例:start.sh
#!/bin/bash
# 设置环境变量
export PYTHONPATH=$(pwd)
export ENVIRONMENT=development
# 安装依赖
pip install -r requirements.txt
# 启动服务器
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload代码示例:test_tool_call.py
import requests
import json
# 服务器配置
BASE_URL = "http://localhost:8000/api/v1"
# 获取工具列表
def test_list_tools():
response = requests.get(f"{BASE_URL}/tools")
print(f"List tools status: {response.status_code}")
print(f"List tools response: {json.dumps(response.json(), indent=2)}")
# 调用 hello_world 工具
def test_hello_world():
payload = {
"name": "MCP",
"greeting": "Hello"
}
response = requests.post(f"{BASE_URL}/tools/hello_world/execute", json=payload)
print(f"Hello world status: {response.status_code}")
print(f"Hello world response: {json.dumps(response.json(), indent=2)}")
# 调用 async_hello_world 工具
def test_async_hello_world():
payload = {
"name": "Async MCP"
}
response = requests.post(f"{BASE_URL}/tools/async_hello_world/execute", json=payload)
print(f"Async hello world status: {response.status_code}")
print(f"Async hello world response: {json.dumps(response.json(), indent=2)}")
if __name__ == "__main__":
print("Testing MCP Server...")
test_list_tools()
print("\n" + "="*50 + "\n")
test_hello_world()
print("\n" + "="*50 + "\n")
test_async_hello_world()
print("\n" + "="*50 + "\n")
print("Testing completed!")代码示例:performance_test.py
import asyncio
import aiohttp
import time
# 服务器配置
BASE_URL = "http://localhost:8000/api/v1"
CONCURRENT_REQUESTS = 100
REQUEST_COUNT = 1000
# 异步测试函数
async def test_tool_call(session):
payload = {
"name": "Performance Test",
"greeting": "Hello"
}
async with session.post(f"{BASE_URL}/tools/hello_world/execute", json=payload) as response:
return await response.json()
# 主测试函数
async def main():
async with aiohttp.ClientSession() as session:
# 测试并发请求
start_time = time.time()
# 分批次发送请求
results = []
for i in range(0, REQUEST_COUNT, CONCURRENT_REQUESTS):
tasks = [test_tool_call(session) for _ in range(min(CONCURRENT_REQUESTS, REQUEST_COUNT - i))]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
end_time = time.time()
# 计算性能指标
total_time = end_time - start_time
requests_per_second = REQUEST_COUNT / total_time
print(f"Total requests: {REQUEST_COUNT}")
print(f"Concurrent requests: {CONCURRENT_REQUESTS}")
print(f"Total time: {total_time:.2f} seconds")
print(f"Requests per second: {requests_per_second:.2f}")
print(f"Average response time: {total_time * 1000 / REQUEST_COUNT:.2f} ms")
# 检查成功率
success_count = sum(1 for r in results if r.get("success"))
print(f"Success rate: {success_count / REQUEST_COUNT * 100:.2f}%")
if __name__ == "__main__":
asyncio.run(main())特性 | FastAPI | Flask |
|---|---|---|
性能 | 高(基于 Starlette) | 中(基于 WSGI) |
异步支持 | 原生支持 | 需使用第三方库(如 Quart) |
类型安全 | 支持(基于 Pydantic) | 有限支持 |
自动文档生成 | 支持(Swagger UI、ReDoc) | 需使用第三方库(如 Flask-RESTPlus) |
依赖注入 | 原生支持 | 需使用第三方库(如 Flask-Injector) |
学习曲线 | 平缓 | 平缓 |
生态系统 | 丰富 | 非常丰富 |
社区支持 | 活跃 | 非常活跃 |
特性 | 异步 | 同步 |
|---|---|---|
并发处理 | 高(单线程处理多个请求) | 低(每个请求一个线程) |
内存占用 | 低(无需创建大量线程) | 高(每个线程占用一定内存) |
上下文切换 | 低(协程切换开销小) | 高(线程切换开销大) |
开发复杂度 | 中等(需要理解异步编程) | 低(传统编程模型) |
兼容性 | 有限(需使用异步库) | 高(兼容所有 Python 库) |
适合场景 | I/O 密集型应用 | CPU 密集型应用 |
特性 | 集中式 | 分布式 |
|---|---|---|
部署复杂度 | 低 | 高 |
可扩展性 | 有限 | 高 |
可靠性 | 低(单点故障) | 高(容错机制) |
性能 | 受限于单节点资源 | 可线性扩展 |
开发复杂度 | 低 | 高(需要处理分布式系统问题) |
维护成本 | 低 | 高 |
代码示例:requirements.txt
# Web 框架
fastapi==0.104.1
uvicorn[standard]==0.24.0
# 数据验证
pydantic==2.5.2
pydantic-settings==2.1.0
# 认证授权
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
# 数据库
sqlalchemy==2.0.23
sqlalchemy-utils==0.41.1
aiofiles==23.2.1
# Redis
redis==5.0.1
aio-redis==2.0.1
# 日志
python-json-logger==2.0.7
loguru==0.7.2
# 监控
prometheus-client==0.19.0
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-otlp==1.22.0
# 测试
aiohttp==3.9.1
pytest==7.4.3
pytest-asyncio==0.23.3
httpx==0.25.2
# 开发工具
black==23.11.0
isort==5.12.0
flake8==6.1.0
mypy==1.7.1
pre-commit==3.5.0
# 其他
python-dotenv==1.0.0
python-magic==0.4.27
uuid==1.30代码示例:Dockerfile
# 使用 Python 3.11 作为基础镜像
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制 requirements.txt 文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 设置环境变量
ENV PYTHONPATH=/app
ENV ENVIRONMENT=production
# 运行应用
CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]代码示例:docker-compose.yml
version: "3.9"
services:
# MCP Server
mcp-server:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://admin:password@db:5432/mcp
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=your-secret-key
depends_on:
- db
- redis
restart: unless-stopped
networks:
- mcp-network
# PostgreSQL 数据库
db:
image: postgres:15-alpine
environment:
- POSTGRES_DB=mcp
- POSTGRES_USER=admin
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
networks:
- mcp-network
# Redis 缓存
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
networks:
- mcp-network
# Nginx 负载均衡
nginx:
image: nginx:1.25-alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- mcp-server
restart: unless-stopped
networks:
- mcp-network
volumes:
postgres_data:
redis_data:
networks:
mcp-network:
driver: bridge问题:安装依赖时出现依赖冲突。
解决方案:
问题:MCP Server 性能不佳,无法处理大量并发请求。
解决方案:
问题:MCP Server 存在安全漏洞,如认证绕过、SQL 注入等。
解决方案:
问题:MCP Server 出现问题,难以调试。
解决方案:
参考链接:
附录(Appendix):
关键词: MCP Server, Python, FastAPI, 异步编程, 高性能, 高可靠性, AI 工具协议