
Call Center AI 是一个基于 Azure 和 OpenAI 的智能呼叫中心解决方案。它允许你通过 API 发起由 AI 代理拨打的电话,或直接接听来自配置号码的来电。该项目旨在帮助开发者在几小时内(而非几周)为保险、IT支持、客户服务等场景定制和部署智能语音助手。
# 示例:让AI机器人拨打电话
data='{
"bot_company": "Contoso",
"bot_name": "Amélie",
"phone_number": "+11234567890",
"task": "帮助客户解决数字化工作场所问题。助手为IT支持部门工作,目标是帮助客户解决问题并收集工单信息。",
"agent_phone_number": "+33612345678",
"claim": [
{"name": "硬件信息", "type": "text"},
{"name": "首次发现时间", "type": "datetime"},
{"name": "建筑位置", "type": "text"}
]
}'
curl \
--header 'Content-Type: application/json' \
--request POST \
--url https://your-instance/call \
--data "$data"CONFIG_JSON 的环境变量中(JSON格式)。通过API端点 /call 发起一个POST请求,即可让AI机器人主动呼叫指定的电话号码。
import requests
import json
url = "https://your-instance/call"
payload = {
"bot_company": "Contoso",
"bot_name": "Amélie",
"phone_number": "+11234567890",
"task": "帮助客户解决网络连接问题。",
"agent_phone_number": "+33612345678", # 人工坐席号码,用于转接
"claim": [ # 需要收集的工单信息
{"name": "用户ID", "type": "text"},
{"name": "问题描述", "type": "text"}
]
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, headers=headers, data=json.dumps(payload))
print(response.json())将你在Azure Communication Services或Twilio中配置的电话号码指向你的服务端点。当有来电时,服务会自动应答,并启动AI助手与客户对话。
POST /call: 发起一个新的由AI代理拨打的电话。POST /events: 用于接收来自Azure Communication Services的通话事件回调(如通话已连接、已断开、DTMF识别结果等)。/ws: 用于与实时音频流进行双向通信,实现低延迟的语音交互。POST /sms: 用于接收SMS消息的回调端点。app/main.py - 片段)这是FastAPI应用的入口,定义了接收来电和事件回调的核心端点。
# 文件: app/main.py (片段)
from fastapi import FastAPI, Request
# ... 其他导入
app = FastAPI()
@app.post("/events")
async def events(request: Request) -> JSONResponse:
"""
处理来自 Azure Communication Services 的回调事件。
包括新呼入、通话已连接、已断开、DTMF识别等。
"""
# 解析云事件
cloud_events = await request.json()
for cloud_event in cloud_events:
event = CloudEvent.from_dict(cloud_event)
logger.info("收到事件: %s", event.type)
# 根据事件类型分发给不同的处理函数
if event.type == "Microsoft.Communication.CallLegStateChanged":
if event.data["state"] == "Connected":
# 处理通话已连接事件,开始媒体流
await on_call_connected(...)
elif event.type == "Microsoft.Communication.PlayCompleted":
await on_automation_play_completed(...)
# ... 其他事件处理
return JSONResponse(status_code=HTTPStatus.OK, content={})
@app.post("/call")
async def outbound_call(
request: OutboundCallRequest, # Pydantic模型,包含呼出参数
background_tasks: BackgroundTasks,
) -> JSONResponse:
"""
发起一个由AI代理拨打的呼出电话。
"""
# 创建通话状态模型
call_state = CallStateModel(
# ... 从请求中初始化
)
# 保存通话状态到数据库
await _db.call_create(call_state)
# 在后台任务中启动呼叫流程,避免阻塞API响应
background_tasks.add_task(
start_outbound_call,
callback_url=str(request.url_for("events")),
call=call_state,
# ...
)
return JSONResponse(status_code=HTTPStatus.ACCEPTED, content={"call_id": str(call_state.call_id)})app/helpers/call_llm.py - 片段)这是AI对话管理的核心,负责接收用户语音识别结果,调用LLM,并将LLM的响应转换为语音。
# 文件: app/helpers/call_llm.py (片段)
@start_as_current_span("call_load_llm_chat")
async def load_llm_chat(
audio_in: asyncio.Queue[bytes], # 从用户接收的音频流
audio_out: asyncio.Queue[bytes | bool], # 发送给用户的音频流
audio_sample_rate: int,
automation_client: CallAutomationClient,
call: CallStateModel,
post_callback: Callable[[CallStateModel], Awaitable[None]],
scheduler: Scheduler,
training_callback: Callable[[CallStateModel], Awaitable[None]],
) -> None:
"""
加载并运行LLM聊天循环。从audio_in队列获取用户的语音输入,
将其转换为文本,发送给LLM,然后将LLM的文本响应转换为语音放入audio_out队列。
"""
# 初始化语音识别(STT)和语音合成(TTS)客户端
async with SttClient(call=call, sample_rate=audio_sample_rate) as stt_client, \
use_tts_client(call.locale) as tts_synthesizer:
# 注册TTS回调,将合成的音频放入输出队列
def tts_callback(text: str):
asyncio.create_task(handle_realtime_tts(text, tts_synthesizer, audio_out))
# 初始化LLM工具插件
tools_plugin = DefaultPlugin(
call=call,
client=automation_client,
post_callback=post_callback,
scheduler=scheduler,
tts_callback=tts_callback,
tts_client=tts_synthesizer,
)
# 创建LLM对话消息历史
messages = await _init_messages(call)
messages.append(MessageModel(persona=PersonaEnum.ASSISTANT, text=call.greeting))
# 主对话循环
while True:
# 等待并获取用户的下一条语音消息
transcript = await stt_client.wait_for_transcript()
if transcript:
messages.append(MessageModel(persona=PersonaEnum.USER, text=transcript))
# 将对话历史发送给LLM获取回复
async for delta in completion_stream(
max_tokens=call.max_tokens,
messages=messages,
system=call.prompts,
tools=tools_plugin.tools_definitions,
):
# 处理LLM的流式响应,包括文本和工具调用
if delta.content:
# 将文本响应通过TTS播报出去
await tts_callback(delta.content)
if delta.tool_calls:
# 执行工具调用
await tools_plugin.execute_tool_call(delta.tool_calls[0])
# 将助手的最新回复添加到消息历史
# ...app/helpers/cache.py)项目使用自定义的LRU缓存装饰器来优化异步和同步函数的性能,减少重复计算和外部服务调用。
# 文件: app/helpers/cache.py
from collections import OrderedDict
from functools import wraps
import asyncio
from collections.abc import Awaitable
def lru_acache(maxsize: int = 128):
"""
异步函数的LRU缓存装饰器。
缓存函数的返回值。当达到最大大小时,最久未使用的缓存项会被移除。
"""
def decorator(func):
cache: OrderedDict[tuple, Awaitable] = OrderedDict()
@wraps(func)
async def wrapper(*args, **kwargs):
# 创建一个包含事件循环ID、位置参数和关键字参数的缓存键
key = (
id(asyncio.get_event_loop()),
args,
frozenset(kwargs.items()),
)
if key in cache:
# 如果缓存命中,将该键移动到末尾(表示最近使用)
cache.move_to_end(key)
return cache[key]
# 计算新值并存入缓存
value = await func(*args, **kwargs)
cache[key] = value
cache.move_to_end(key)
# 如果超出最大大小,移除最久未使用的项(第一个)
if len(cache) > maxsize:
cache.popitem(last=False)
return value
return wrapper
return decorator
# 同步函数的LRU缓存
def lru_cache(maxsize: int = 128):
"""同步函数的LRU缓存装饰器,原理同上。"""
# ... 实现
```FINISHEDAHFTVYTuX6S/IgIPsbXOvjn9f7SQ1VezSmjKy6Zrwto=
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。