在数字化转型浪潮中,智能在线客服系统已成为企业提升客户服务效率、降低运营成本的关键工具。传统客服系统往往存在响应慢、人力成本高、服务时间有限等问题,而现代智能客服系统通过AI技术实现了7×24小时不间断服务,大幅提升了客户满意度。
源码及演示:zxkfym.top
今天我们将深入探讨一个完整的智能在线客服系统开源实现,该系统集成了AI机器人、工单系统和多语言支持三大核心模块。这个开源项目采用现代化的技术栈,代码结构清晰,易于二次开发,适合中小型企业快速部署或开发者学习参考。

AI机器人的核心是自然语言理解能力。我们采用基于Transformer的预训练模型作为基础,结合业务场景进行微调。以下是核心处理流程的代码实现:
# nlp_engine.py
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel
from typing import Dict, List, Tuple
import json
class IntentClassifier(nn.Module):
"""意图分类器"""
def __init__(self, bert_model_name: str, num_intents: int):
super(IntentClassifier, self).__init__()
self.bert = BertModel.from_pretrained(bert_model_name)
self.dropout = nn.Dropout(0.3)
self.classifier = nn.Linear(self.bert.config.hidden_size, num_intents)
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
pooled_output = outputs.pooler_output
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
return logits
class NLUEngine:
"""自然语言理解引擎"""
def __init__(self, model_path: str, intents_config: str):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
# 加载意图配置
with open(intents_config, 'r', encoding='utf-8') as f:
self.intents = json.load(f)
# 加载模型
self.model = IntentClassifier('bert-base-chinese', len(self.intents))
self.model.load_state_dict(torch.load(model_path, map_location=self.device))
self.model.to(self.device)
self.model.eval()
def predict_intent(self, text: str) -> Dict:
"""预测用户意图"""
inputs = self.tokenizer(
text,
padding=True,
truncation=True,
max_length=128,
return_tensors="pt"
)
with torch.no_grad():
logits = self.model(
inputs['input_ids'].to(self.device),
inputs['attention_mask'].to(self.device)
)
probabilities = torch.softmax(logits, dim=-1)
predicted_idx = torch.argmax(probabilities, dim=-1).item()
return {
'intent': self.intents[predicted_idx]['name'],
'confidence': probabilities[0][predicted_idx].item(),
'entities': self.extract_entities(text)
}
def extract_entities(self, text: str) -> List[Dict]:
"""实体抽取"""
# 基于规则和模型的混合实体抽取
entities = []
# 时间实体识别
time_patterns = [
r'(\d{4}年\d{1,2}月\d{1,2}日)',
r'(\d{1,2}月\d{1,2}日)',
r'(\d{1,2}:\d{2})'
]
import re
for pattern in time_patterns:
matches = re.findall(pattern, text)
for match in matches:
entities.append({
'type': 'TIME',
'value': match,
'start': text.find(match),
'end': text.find(match) + len(match)
})
return entities
对话管理是AI机器人的大脑,负责维护对话状态和决策下一步动作:
# dialogue_manager.py
from enum import Enum
from dataclasses import dataclass
from typing import Optional, List, Dict
import time
class DialogueState(Enum):
GREETING = "greeting"
ASKING_INTENT = "asking_intent"
PROCESSING = "processing"
CONFIRMING = "confirming"
COMPLETED = "completed"
ESCALATED = "escalated"
@dataclass
class DialogueContext:
"""对话上下文"""
session_id: str
current_state: DialogueState
user_intent: Optional[str] = None
entities: List[Dict] = None
history: List[Dict] = None
start_time: float = None
last_active: float = None
def __post_init__(self):
if self.entities is None:
self.entities = []
if self.history is None:
self.history = []
if self.start_time is None:
self.start_time = time.time()
self.last_active = time.time()
class DialogueManager:
"""对话管理器"""
def __init__(self, nlu_engine, knowledge_base):
self.nlu_engine = nlu_engine
self.knowledge_base = knowledge_base
self.sessions: Dict[str, DialogueContext] = {}
def process_message(self, session_id: str, user_input: str) -> Dict:
"""处理用户输入"""
# 获取或创建会话上下文
if session_id not in self.sessions:
context = DialogueContext(
session_id=session_id,
current_state=DialogueState.GREETING
)
self.sessions[session_id] = context
else:
context = self.sessions[session_id]
# 更新活跃时间
context.last_active = time.time()
# 自然语言理解
nlu_result = self.nlu_engine.predict_intent(user_input)
context.user_intent = nlu_result['intent']
context.entities = nlu_result['entities']
# 根据当前状态和意图决定下一步
response = self._decide_response(context, nlu_result)
# 更新对话历史
context.history.append({
'user': user_input,
'bot': response['text'],
'timestamp': time.time(),
'intent': nlu_result['intent'],
'confidence': nlu_result['confidence']
})
# 更新对话状态
context.current_state = response.get('next_state', context.current_state)
return response
def _decide_response(self, context: DialogueContext, nlu_result: Dict) -> Dict:
"""决策响应"""
intent = nlu_result['intent']
confidence = nlu_result['confidence']
# 根据意图和置信度决定响应策略
if confidence < 0.6:
return {
'text': '抱歉,我没有完全理解您的意思。您可以换种方式描述您的问题吗?',
'next_state': DialogueState.ASKING_INTENT,
'suggestions': ['产品咨询', '技术支持', '投诉建议', '订单查询']
}
# 查询知识库获取答案
if intent in ['product_query', 'technical_support', 'faq']:
answer = self.knowledge_base.query(intent, nlu_result['entities'])
if answer:
return {
'text': answer,
'next_state': DialogueState.COMPLETED,
'suggest_followup': True
}
else:
return {
'text': '这个问题我暂时无法回答,建议您联系人工客服或提交工单。',
'next_state': DialogueState.ESCALATED,
'actions': ['create_ticket', 'transfer_to_human']
}
# 处理工单相关意图
elif intent == 'create_ticket':
return {
'text': '我将帮您创建工单,请简要描述您的问题。',
'next_state': DialogueState.PROCESSING,
'form': {
'fields': ['title', 'description', 'priority', 'category']
}
}
# 默认响应
return {
'text': '您好!我是智能客服助手,请问有什么可以帮您?',
'next_state': DialogueState.GREETING
}
工单系统采用状态机模式管理工单生命周期,支持自定义工作流:
# models/ticket.py
from datetime import datetime
from enum import Enum
from typing import Optional, List
from sqlalchemy import Column, Integer, String, Text, DateTime, Enum as SQLEnum, ForeignKey
from sqlalchemy.orm import relationship
from database import Base
class TicketPriority(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class TicketStatus(Enum):
NEW = "new"
OPEN = "open"
PENDING = "pending"
RESOLVED = "resolved"
CLOSED = "closed"
CANCELLED = "cancelled"
class TicketCategory(Enum):
TECHNICAL = "technical"
BILLING = "billing"
ACCOUNT = "account"
FEATURE_REQUEST = "feature_request"
BUG_REPORT = "bug_report"
class Ticket(Base):
"""工单模型"""
__tablename__ = "tickets"
id = Column(Integer, primary_key=True, index=True)
ticket_id = Column(String(50), unique=True, index=True, nullable=False)
title = Column(String(200), nullable=False)
description = Column(Text, nullable=False)
priority = Column(SQLEnum(TicketPriority), default=TicketPriority.MEDIUM)
status = Column(SQLEnum(TicketStatus), default=TicketStatus.NEW)
category = Column(SQLEnum(TicketCategory), nullable=False)
# 关联信息
customer_id = Column(Integer, ForeignKey("users.id"), nullable=False)
assignee_id = Column(Integer, ForeignKey("users.id"), nullable=True)
department_id = Column(Integer, ForeignKey("departments.id"), nullable=True)
# 时间戳
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
resolved_at = Column(DateTime, nullable=True)
due_date = Column(DateTime, nullable=True)
# SLA相关
first_response_time = Column(DateTime, nullable=True)
resolution_deadline = Column(DateTime, nullable=True)
# 关系
customer = relationship("User", foreign_keys=[customer_id])
assignee = relationship("User", foreign_keys=[assignee_id])
department = relationship("Department")
comments = relationship("TicketComment", back_populates="ticket")
attachments = relationship("TicketAttachment", back_populates="ticket")
activities = relationship("TicketActivity", back_populates="ticket")
def generate_ticket_id(self):
"""生成工单编号"""
from datetime import datetime
prefix = {
TicketCategory.TECHNICAL: "TECH",
TicketCategory.BILLING: "BILL",
TicketCategory.ACCOUNT: "ACC",
TicketCategory.FEATURE_REQUEST: "FR",
TicketCategory.BUG_REPORT: "BUG"
}.get(self.category, "TKT")
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
return f"{prefix}-{timestamp}-{self.id:06d}"
def calculate_sla_deadlines(self):
"""计算SLA截止时间"""
from datetime import datetime, timedelta
sla_rules = {
TicketPriority.URGENT: {
'first_response': timedelta(minutes=30),
'resolution': timedelta(hours=4)
},
TicketPriority.HIGH: {
'first_response': timedelta(hours=1),
'resolution': timedelta(hours=8)
},
TicketPriority.MEDIUM: {
'first_response': timedelta(hours=4),
'resolution': timedelta(days=2)
},
TicketPriority.LOW: {
'first_response': timedelta(hours=8),
'resolution': timedelta(days=5)
}
}
rule = sla_rules.get(self.priority, sla_rules[TicketPriority.MEDIUM])
self.first_response_time = datetime.utcnow() + rule['first_response']
self.resolution_deadline = datetime.utcnow() + rule['resolution']
# workflows/ticket_workflow.py
from enum import Enum
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Transition:
"""状态转移"""
from_state: str
to_state: str
condition: Optional[Callable] = None
action: Optional[Callable] = None
required_fields: List[str] = None
class TicketWorkflow:
"""工单工作流引擎"""
def __init__(self):
self.transitions: Dict[str, List[Transition]] = {}
self._initialize_workflow()
def _initialize_workflow(self):
"""初始化工作流规则"""
# 新建 -> 已打开
self.add_transition(
Transition(
from_state="new",
to_state="open",
condition=self._can_open_ticket,
action=self._on_ticket_open,
required_fields=["assignee_id"]
)
)
# 已打开 -> 处理中
self.add_transition(
Transition(
from_state="open",
to_state="pending",
condition=lambda t: t.assignee_id is not None,
action=self._on_ticket_pending
)
)
# 处理中 -> 已解决
self.add_transition(
Transition(
from_state="pending",
to_state="resolved",
condition=self._can_resolve_ticket,
action=self._on_ticket_resolved,
required_fields=["resolution_notes"]
)
)
# 已解决 -> 已关闭
self.add_transition(
Transition(
from_state="resolved",
to_state="closed",
condition=lambda t: True,
action=self._on_ticket_closed
)
)
# 重新打开工单
self.add_transition(
Transition(
from_state="closed",
to_state="open",
condition=self._can_reopen_ticket,
action=self._on_ticket_reopen,
required_fields=["reopen_reason"]
)
)
def add_transition(self, transition: Transition):
"""添加状态转移"""
if transition.from_state not in self.transitions:
self.transitions[transition.from_state] = []
self.transitions[transition.from_state].append(transition)
def can_transition(self, ticket, to_state: str, data: Dict = None) -> bool:
"""检查是否可以执行状态转移"""
current_state = ticket.status.value
if current_state not in self.transitions:
return False
for transition in self.transitions[current_state]:
if transition.to_state == to_state:
# 检查条件
if transition.condition and not transition.condition(ticket):
return False
# 检查必填字段
if transition.required_fields and data:
for field in transition.required_fields:
if field not in data or not data[field]:
return False
return True
return False
def transition(self, ticket, to_state: str, data: Dict = None) -> bool:
"""执行状态转移"""
if not self.can_transition(ticket, to_state, data):
return False
current_state = ticket.status.value
for transition in self.transitions[current_state]:
if transition.to_state == to_state:
# 执行动作
if transition.action:
transition.action(ticket, data)
# 更新状态
ticket.status = TicketStatus(to_state)
ticket.updated_at = datetime.utcnow()
# 记录活动日志
self._log_activity(ticket, current_state, to_state, data)
return True
return False
def _can_open_ticket(self, ticket) -> bool:
"""检查是否可以打开工单"""
return ticket.assignee_id is not None
def _on_ticket_open(self, ticket, data):
"""工单打开时的动作"""
ticket.first_response_time = datetime.utcnow()
def _can_resolve_ticket(self, ticket) -> bool:
"""检查是否可以解决工单"""
return ticket.assignee_id is not None
def _on_ticket_resolved(self, ticket, data):
"""工单解决时的动作"""
ticket.resolved_at = datetime.utcnow()
ticket.resolution_notes = data.get("resolution_notes", "")
def _can_reopen_ticket(self, ticket) -> bool:
"""检查是否可以重新打开工单"""
# 只有关闭时间在30天内的工单可以重新打开
if not ticket.resolved_at:
return False
days_since_resolved = (datetime.utcnow() - ticket.resolved_at).days
return days_since_resolved <= 30
def _log_activity(self, ticket, from_state, to_state, data):
"""记录活动日志"""
from models import TicketActivity
activity = TicketActivity(
ticket_id=ticket.id,
user_id=data.get("user_id") if data else None,
activity_type="status_change",
description=f"状态从 {from_state} 变更为 {to_state}",
metadata={
"from_state": from_state,
"to_state": to_state,
"data": data
}
)
# 保存到数据库
# db.session.add(activity)
# db.session.commit()
# i18n/translator.py
import json
import os
from typing import Dict, Optional, List
from pathlib import Path
import gettext
import babel
from babel.support import Translations
class I18nManager:
"""国际化管理器"""
def __init__(self, locales_dir: str = "locales", default_language: str = "zh_CN"):
self.locales_dir = Path(locales_dir)
self.default_language = default_language
self.supported_languages = self._discover_languages()
self.translations: Dict[str, Translations] = {}
self._load_translations()
def _discover_languages(self) -> List[str]:
"""发现支持的语言"""
languages = []
if self.locales_dir.exists():
for item in self.locales_dir.iterdir():
if item.is_dir():
languages.append(item.name)
return languages
def _load_translations(self):
"""加载翻译文件"""
for lang in self.supported_languages:
mo_file = self.locales_dir / lang / "LC_MESSAGES" / "messages.mo"
if mo_file.exists():
with open(mo_file, 'rb') as f:
self.translations[lang] = Translations(f, domain="messages")
def gettext(self, text: str, language: Optional[str] = None) -> str:
"""获取翻译文本"""
lang = language or self.default_language
if lang in self.translations:
translation = self.translations[lang].gettext(text)
return translation if translation != text else text
return text
def ngettext(self, singular: str, plural: str, n: int, language: Optional[str] = None) -> str:
"""获取复数形式翻译"""
lang = language or self.default_language
if lang in self.translations:
return self.translations[lang].ngettext(singular, plural, n)
return singular if n == 1 else plural
def format_datetime(self, dt, language: Optional[str] = None, format: str = "medium") -> str:
"""格式化日期时间"""
lang = language or self.default_language
locale = babel.Locale.parse(lang.replace('_', '-'))
if format == "full":
return babel.dates.format_datetime(dt, format="full", locale=locale)
elif format == "long":
return babel.dates.format_datetime(dt, format="long", locale=locale)
elif format == "medium":
return babel.dates.format_datetime(dt, format="medium", locale=locale)
else:
return babel.dates.format_datetime(dt, format="short", locale=locale)
def format_number(self, number, language: Optional[str] = None) -> str:
"""格式化数字"""
lang = language or self.default_language
locale = babel.Locale.parse(lang.replace('_', '-'))
return babel.numbers.format_number(number, locale=locale)
def format_currency(self, amount, currency: str, language: Optional[str] = None) -> str:
"""格式化货币"""
lang = language or self.default_language
locale = babel.Locale.parse(lang.replace('_', '-'))
return babel.numbers.format_currency(amount, currency, locale=locale)
# 前端多语言支持示例
class FrontendI18n:
"""前端国际化"""
def __init__(self):
self.translations = {}
self.current_language = 'zh_CN'
async def load_translations(self, language: str):
"""加载翻译文件"""
try:
response = await fetch(`/api/translations/${language}`)
self.translations[language] = await response.json()
self.current_language = language
this._update_ui()
} catch (error) {
console.error(`Failed to load translations for ${language}:`, error)
}
function t(key, params = {}) {
"""翻译文本"""
const lang_data = this.translations[this.current_language] || {}
let text = lang_data[key] || key
// 替换参数
Object.keys(params).forEach(param => {
text = text.replace(`{${param}}`, params[param])
})
return text
}
function _update_ui() {
"""更新UI文本"""
// 更新所有带有data-i18n属性的元素
document.querySelectorAll('[data-i18n]').forEach(element => {
const key = element.getAttribute('data-i18n')
const params = JSON.parse(element.getAttribute('data-i18n-params') || '{}')
element.textContent = this.t(key, params)
})
// 更新所有带有data-i18n-placeholder属性的输入框
document.querySelectorAll('[data-i18n-placeholder]').forEach(element => {
const key = element.getAttribute('data-i18n-placeholder')
element.placeholder = this.t(key)
})
// 更新所有带有data-i18n-title属性的元素
document.querySelectorAll('[data-i18n-title]').forEach(element => {
const key = element.getAttribute('data-i18n-title')
element.title = this.t(key)
})
}-- 多语言支持的数据库设计
CREATE TABLE languages (
id SERIAL PRIMARY KEY,
code VARCHAR(10) NOT NULL UNIQUE, -- 如: zh_CN, en_US
name VARCHAR(50) NOT NULL, -- 如: 简体中文, English
native_name VARCHAR(50) NOT NULL, -- 如: 中文, English
is_active BOOLEAN DEFAULT TRUE,
is_default BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE translation_keys (
id SERIAL PRIMARY KEY,
key VARCHAR(255) NOT NULL UNIQUE,
module VARCHAR(50) NOT NULL, -- 模块名: common, ticket, chat, etc.
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE translations (
id SERIAL PRIMARY KEY,
key_id INTEGER NOT NULL REFERENCES translation_keys(id),
language_id INTEGER NOT NULL REFERENCES languages(id),
value TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(key_id, language_id)
);
-- 多语言内容表设计示例
CREATE TABLE articles (
id SERIAL PRIMARY KEY,
slug VARCHAR(255) NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE article_translations (
id SERIAL PRIMARY KEY,
article_id INTEGER NOT NULL REFERENCES articles(id),
language_id INTEGER NOT NULL REFERENCES languages(id),
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
excerpt TEXT,
meta_title VARCHAR(255),
meta_description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(article_id, language_id)
);系统采用微服务架构,各模块独立部署,通过API网关进行通信:
# docker-compose.yml 示例
version: '3.8'
services:
# API网关
api-gateway:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/conf.d:/etc/nginx/conf.d
- ./ssl:/etc/nginx/ssl
depends_on:
- auth-service
- chat-service
- ticket-service
- ai-service
# 认证服务
auth-service:
build: ./services/auth
environment:
- DATABASE_URL=postgresql://user:password@postgres:5432/auth_db
- REDIS_URL=redis://redis:6379/0
- JWT_SECRET=${JWT_SECRET}
ports:
- "3001:3000"
# 聊天服务
chat-service:
build: ./services/chat
environment:
- DATABASE_URL=postgresql://user:password@postgres:5432/chat_db
- REDIS_URL=redis://redis:6379/1
- WEBSOCKET_PORT=3002
ports:
- "3002:3002"
# 工单服务
ticket-service:
build: ./services/ticket
environment:
- DATABASE_URL=postgresql://user:password@postgres:5432/ticket_db
- REDIS_URL=redis://redis:6379/2
ports:
- "3003:3000"
# AI服务
ai-service:
build: ./services/ai
environment:
- MODEL_PATH=/app/models/bert-base-chinese
- INTENTS_CONFIG=/app/config/intents.json
- KNOWLEDGE_BASE_URL=http://knowledge-base:3000
ports:
- "3004:3000"
# 数据库
postgres:
image: postgres:14-alpine
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=customer_service
volumes:
- postgres_data:/var/lib/postgresql/data
# Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:# services/chat/websocket_server.py
import asyncio
import json
import logging
from typing import Dict, Set
from datetime import datetime
import websockets
from websockets.server import WebSocketServerProtocol
class ChatWebSocketServer:
"""WebSocket聊天服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 3002):
self.host = host
self.port = port
self.connections: Dict[str, Set[WebSocketServerProtocol]] = {}
self.user_sockets: Dict[str, WebSocketServerProtocol] = {}
async def handle_connection(self, websocket: WebSocketServerProtocol, path: str):
"""处理WebSocket连接"""
try:
# 验证连接
user_id = await self.authenticate(websocket)
if not user_id:
await websocket.close(code=4001, reason="Authentication failed")
return
# 注册连接
await self.register_connection(user_id, websocket)
# 处理消息
async for message in websocket:
await self.handle_message(user_id, message, websocket)
except websockets.exceptions.ConnectionClosed:
logging.info(f"Connection closed for user")
except Exception as e:
logging.error(f"Error handling connection: {e}")
finally:
# 清理连接
await self.unregister_connection(user_id, websocket)
async def authenticate(self, websocket: WebSocketServerProtocol) -> str:
"""验证用户身份"""
try:
# 从查询参数获取token
query_params = dict(param.split('=') for param in websocket.path.split('?')[1].split('&'))
token = query_params.get('token')
if not token:
return None
# 验证JWT token
import jwt
from config import JWT_SECRET
payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
return payload.get('user_id')
except Exception as e:
logging.error(f"Authentication error: {e}")
return None
async def register_connection(self, user_id: str, websocket: WebSocketServerProtocol):
"""注册连接"""
if user_id not in self.connections:
self.connections[user_id] = set()
self.connections[user_id].add(websocket)
self.user_sockets[user_id] = websocket
# 通知用户上线
await self.broadcast_to_user(user_id, {
'type': 'presence',
'user_id': user_id,
'status': 'online',
'timestamp': datetime.utcnow().isoformat()
})
async def handle_message(self, user_id: str, message: str, websocket: WebSocketServerProtocol):
"""处理消息"""
try:
data = json.loads(message)
message_type = data.get('type')
if message_type == 'chat_message':
await self.handle_chat_message(user_id, data)
elif message_type == 'typing_indicator':
await self.handle_typing_indicator(user_id, data)
elif message_type == 'read_receipt':
await self.handle_read_receipt(user_id, data)
elif message_type == 'file_upload':
await self.handle_file_upload(user_id, data)
else:
logging.warning(f"Unknown message type: {message_type}")
except json.JSONDecodeError:
logging.error(f"Invalid JSON message: {message}")
except Exception as e:
logging.error(f"Error handling message: {e}")
async def handle_chat_message(self, sender_id: str, data: Dict):
"""处理聊天消息"""
message = {
'id': self.generate_message_id(),
'type': 'chat_message',
'sender_id': sender_id,
'recipient_id': data['recipient_id'],
'content': data['content'],
'content_type': data.get('content_type', 'text'),
'timestamp': datetime.utcnow().isoformat(),
'status': 'sent'
}
# 保存到数据库
await self.save_message_to_db(message)
# 转发给接收者
recipient_id = data['recipient_id']
if recipient_id in self.connections:
await self.send_to_user(recipient_id, message)
# 发送回执给发送者
await self.send_to_user(sender_id, {
'type': 'message_delivered',
'message_id': message['id'],
'timestamp': datetime.utcnow().isoformat()
})
async def send_to_user(self, user_id: str, message: Dict):
"""发送消息给指定用户"""
if user_id in self.connections:
for websocket in self.connections[user_id]:
try:
await websocket.send(json.dumps(message))
except Exception as e:
logging.error(f"Error sending message to user {user_id}: {e}")
async def broadcast_to_user(self, user_id: str, message: Dict):
"""广播消息给用户的所有连接"""
await self.send_to_user(user_id, message)
def generate_message_id(self) -> str:
"""生成消息ID"""
import uuid
return str(uuid.uuid4())
async def save_message_to_db(self, message: Dict):
"""保存消息到数据库"""
# 这里实现数据库保存逻辑
pass
async def unregister_connection(self, user_id: str, websocket: WebSocketServerProtocol):
"""注销连接"""
if user_id in self.connections:
self.connections[user_id].discard(websocket)
if not self.connections[user_id]:
del self.connections[user_id]
if user_id in self.user_sockets and self.user_sockets[user_id] == websocket:
del self.user_sockets[user_id]
async def run(self):
"""运行WebSocket服务器"""
async with websockets.serve(
self.handle_connection,
self.host,
self.port,
ping_interval=20,
ping_timeout=60
):
logging.info(f"WebSocket server started on ws://{self.host}:{self.port}")
await asyncio.Future() # 永久运行# Dockerfile for AI Service
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 下载预训练模型
RUN python -c "from transformers import BertTokenizer, BertModel; \
BertTokenizer.from_pretrained('bert-base-chinese'); \
BertModel.from_pretrained('bert-base-chinese')"
# 创建非root用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 3000
# 启动命令
CMD ["gunicorn", "app:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:3000"]# .env 配置文件
# 数据库配置
DATABASE_URL=postgresql://user:password@postgres:5432/customer_service
REDIS_URL=redis://redis:6379/0
# JWT配置
JWT_SECRET=your-super-secret-jwt-key-change-in-production
JWT_ALGORITHM=HS256
JWT_EXPIRE_MINUTES=60
# AI模型配置
BERT_MODEL_NAME=bert-base-chinese
INTENTS_CONFIG_PATH=/app/config/intents.json
KNOWLEDGE_BASE_PATH=/app/data/knowledge_base.json
# 邮件配置
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=your-email@gmail.com
SMTP_PASSWORD=your-app-password
EMAIL_FROM=noreply@yourdomain.com
# 文件存储
STORAGE_TYPE=s3 # local, s3, azure
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
AWS_S3_BUCKET=your-bucket-name
# 多语言配置
DEFAULT_LANGUAGE=zh_CN
SUPPORTED_LANGUAGES=zh_CN,en_US,ja_JP,ko_KR
LOCALES_DIR=/app/locales本文详细介绍了智能在线客服系统的完整实现方案,涵盖了AI机器人、工单系统和多语言支持三大核心模块。系统采用微服务架构,各模块可独立部署和扩展,具有良好的可维护性和可扩展性。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。