
本文将带你通过一个实战案例——client_demo.py,深入了解如何使用 LiveKit Python SDK 构建我们将从环境搭建开始,逐步解析核心概念、架构设计以及关键功能的实现。
LiveKit 是一个开源的实时音视频(WebRTC)基础设施,它提供了简单易用的 SDK,让开发者能够轻松构建低延迟、高质量的音视频应用。
本实战将构建一个基于 Python 的控制台 + 窗口界面的视频客户端,具备以下核心功能:
在开始之前,请确保你已经安装了 Python 环境(建议 3.9+)。
项目主要依赖 livekit SDK 和 opencv-python(用于视频处理和显示)。
pip3 install livekit python-dotenv opencv-python numpy在项目根目录下创建一个 .env 文件,配置连接 LiveKit 服务器所需的凭据:
LIVEKIT_URL=wss://your-project.livekit.cloud # 你的 LiveKit 服务器地址
LIVEKIT_API_KEY=API... # 你的 API Key
LIVEKIT_API_SECRET=Secret... # 你的 API Secret我们的 client_demo.py 采用了基于 asyncio 的异步架构,并通过 LiveKitClientApp 类封装了所有逻辑。主要组件包括:
run): 负责初始化、连接和资源清理。_ui_worker): 专门负责 OpenCV 窗口的绘制和显示,确保界面响应流畅。_video_capture_worker): 负责从摄像头读取帧并推送到 LiveKit。_video_render_worker): 负责接收远程视频流并解码为图像数据。首先,我们需要初始化 Room 对象,这是与 LiveKit 交互的核心入口。
class LiveKitClientApp:
def __init__(self):
# 初始化 Room 对象
self.room = rtc.Room()
# 控制应用生命周期的事件
self.stop_event = asyncio.Event()
# ... (配置加载和状态初始化)
# 注册事件回调
self._register_event_handlers()连接过程在 _connect_room 方法中实现。值得注意的是,如果本地开发环境提供了 API Key/Secret,我们可以直接在客户端生成 Token(注意:生产环境中 Token 应始终由后端服务器签发)。
async def _connect_room(self):
# ... (Token 生成逻辑)
if not self.token:
self.token = api.AccessToken(self.api_key, self.api_secret) \
.with_identity("python-client-demo") \
.with_grants(api.VideoGrants(room_join=True, room="client-demo-room")) \
.to_jwt()
logger.info(f"Connecting to {self.url}...")
await self.room.connect(self.url, self.token)
logger.info(f"Connected to room: {self.room.name}")SDK 的 rtc.MediaDevices 提供了列出音视频设备的能力。我们实现了一个交互式的 _setup_devices 流程,允许用户在启动时选择设备。
async def _setup_devices(self):
media_devices = rtc.MediaDevices()
# 获取并选择麦克风
self.mic_index = self._get_device_selection(media_devices.list_input_devices(), "Microphone")
# 获取并选择扬声器
self.speaker_index = self._get_device_selection(media_devices.list_output_devices(), "Speaker")
# ...为了展示 LiveKit 的灵活性,我们没有使用 SDK 默认的摄像头采集,而是自己通过 OpenCV 读取摄像头数据,这使得我们可以在发送视频前对其进行处理(例如美颜、背景模糊)。
逻辑在 _video_capture_worker 中:
cv2.VideoCapture 读取帧(BGR 格式)。rtc.VideoFrame。rtc.VideoSource.capture_frame 发送。async def _video_capture_worker(self, source: rtc.VideoSource, cap: cv2.VideoCapture):
while not self.stop_event.is_set():
# 在线程池中执行阻塞的 read 操作
ret, frame = await asyncio.to_thread(cap.read)
if not ret: continue
# 格式转换 BGR -> RGBA
rgba_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
height, width, _ = rgba_frame.shape
# 创建 LiveKit 视频帧并发送
lk_frame = rtc.VideoFrame(width, height, rtc.VideoBufferType.RGBA, rgba_frame.tobytes())
source.capture_frame(lk_frame)
# 保存本地预览帧
self.video_state["local_frame"] = frame
await asyncio.sleep(0.033) # 约 30fps发布时,我们需要创建一个 LocalVideoTrack 并关联这个 source:
video_source = rtc.VideoSource(w, h)
track = rtc.LocalVideoTrack.create_video_track("camera", video_source)
await self.room.local_participant.publish_track(track)当房间内有其他用户发布视频时,会触发 track_subscribed 事件。我们需要为每个视频轨道启动一个渲染任务。
@self.room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
if track.kind == rtc.TrackKind.KIND_VIDEO:
video_stream = rtc.VideoStream(track)
task = asyncio.create_task(self._video_render_worker(video_stream, participant.identity))
self.render_tasks[participant.identity] = task_video_render_worker 是一个异步生成器消费者,它不断从 VideoStream 获取帧,并将其转换为 OpenCV 可显示的 BGR 格式:
async def _video_render_worker(self, stream, participant_identity):
async for frame_event in stream:
frame = frame_event.frame
# 确保格式为 RGBA
if frame.type != rtc.VideoBufferType.RGBA:
frame = frame.convert(rtc.VideoBufferType.RGBA)
# 转换为 numpy 数组并转回 BGR
arr = np.frombuffer(frame.data, dtype=np.uint8).reshape(frame.height, frame.width, 4)
bgr_frame = cv2.cvtColor(arr, cv2.COLOR_RGBA2BGR)
# 更新全局状态供 UI 显示
self.video_state["remote_frame"] = bgr_frameUI 线程 (_ui_worker) 独立于网络和采集线程运行,它从 self.video_state 读取最新的帧并合成显示。我们实现了一个简单的画中画 (PiP) 逻辑:
if remote_frame is not None:
display_frame = remote_frame.copy()
if local_frame is not None:
# 计算缩放和位置,叠加本地画面
# ... (详见源码 _ui_worker 方法)
display_frame[y_off:y_off+new_h, x_off:x_off+new_w] = resized_local通过这个实战案例,我们掌握了 LiveKit Python SDK 的几个关键点:
asyncio 协调 UI、网络 IO 和视频处理任务。import asyncio
import logging
import os
import sys
from typing import Optional, Dict
import cv2
import numpy as np
from dotenv import load_dotenv
from livekit import api, rtc
# 加载环境变量
load_dotenv()
# 配置日志系统
# 使用基本的日志配置,输出时间、日志级别和消息内容,便于调试和监控
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("LiveKitClient")
class LiveKitClientApp:
"""
LiveKit 客户端演示应用类。
该类演示了如何使用 LiveKit Python SDK 构建一个简单的视频会议客户端。
主要功能包括:
1. 设备管理:选择麦克风、扬声器和摄像头。
2. 房间连接:连接到 LiveKit 服务器。
3. 媒体发布:发布本地音频和视频轨道。
4. 视频渲染:使用 OpenCV 渲染远程和本地视频流。
5. 状态管理:处理多线程/协程间的状态同步。
"""
def __init__(self):
# 初始化 Room 对象,这是 LiveKit SDK 的主要入口点
self.room = rtc.Room()
# 用于控制应用退出流的事件信号
self.stop_event = asyncio.Event()
# 房间配置信息
# 从环境变量中读取连接所需的 URL 和认证凭据
self.url = os.getenv("LIVEKIT_URL")
self.token = os.getenv("LIVEKIT_TOKEN")
self.api_key = os.getenv("LIVEKIT_API_KEY")
self.api_secret = os.getenv("LIVEKIT_API_SECRET")
# 状态管理
# video_state 用于在不同协程间共享视频帧数据
# OpenCV 使用 BGR 格式,而 LiveKit 内部通常处理 RGBA 或 YUV,因此需要明确注释
self.video_state = {
"local_frame": None, # 本地摄像头捕获的帧 (OpenCV 格式: numpy array, BGR)
"remote_frame": None, # 远程接收到的帧 (OpenCV 格式: numpy array, BGR)
"remote_identity": None # 当前显示的远程用户标识 (str)
}
# 存储视频渲染任务,以便在用户断开或取消订阅时清理任务
self.render_tasks: Dict[str, asyncio.Task] = {}
self.audio_status = {"mic": False, "speaker": False}
# 设备索引
# 用于存储用户选择的音频输入、输出和摄像头设备 ID
self.mic_index: Optional[int] = None
self.speaker_index: Optional[int] = None
self.cam_index: Optional[int] = None
# 注册事件处理器
# LiveKit 是事件驱动的,我们需要监听各种房间事件(如连接、订阅等)
self._register_event_handlers()
def _register_event_handlers(self):
@self.room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
logger.info(f"Participant connected: {participant.identity} ({participant.name})")
@self.room.on("participant_disconnected")
def on_participant_disconnected(participant: rtc.RemoteParticipant):
logger.info(f"Participant disconnected: {participant.identity}")
if self.video_state['remote_identity'] == participant.identity:
self.video_state['remote_frame'] = None
self.video_state['remote_identity'] = None
@self.room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
# 当本地成功订阅到远程用户的轨道时触发
kind_str = "AUDIO" if track.kind == rtc.TrackKind.KIND_AUDIO else "VIDEO"
logger.info(f"已订阅轨道: {kind_str} 来自 {participant.identity} ({track.sid})")
if track.kind == rtc.TrackKind.KIND_AUDIO:
# 音频轨道处理:
# 通常 Python SDK 会自动处理音频播放(通过系统默认输出)。
# 如果需要控制特定输出设备或进行音频分析,可以处理原始音频流。
# 在这个演示中,我们主要依赖默认播放行为。
if self.audio_status["speaker"]:
# 注意:在 Python SDK 中指定特定扬声器设备输出需要更底层的音频库配合,
# 这里我们暂且假设使用系统默认或已选定设备。
pass
# 创建异步任务来管理音频生命周期(如需要)
loop = asyncio.get_event_loop()
loop.create_task(self._handle_audio_track(track))
elif track.kind == rtc.TrackKind.KIND_VIDEO:
# 视频轨道处理:
# 我们需要创建一个 VideoStream 来接收视频帧。
# 然后启动一个后台 worker (渲染任务) 将帧转换为 OpenCV 格式并显示。
video_stream = rtc.VideoStream(track)
task = asyncio.create_task(self._video_render_worker(video_stream, participant.identity))
self.render_tasks[participant.identity] = task
@self.room.on("track_unsubscribed")
def on_track_unsubscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
kind_str = "AUDIO" if track.kind == rtc.TrackKind.KIND_AUDIO else "VIDEO"
logger.info(f"Track unsubscribed: {kind_str} from {participant.identity}")
if track.kind == rtc.TrackKind.KIND_VIDEO:
if participant.identity in self.render_tasks:
self.render_tasks[participant.identity].cancel()
del self.render_tasks[participant.identity]
if self.video_state['remote_identity'] == participant.identity:
self.video_state['remote_frame'] = None
self.video_state['remote_identity'] = None
async def _handle_audio_track(self, track: rtc.Track):
"""
辅助方法:管理音频轨道生命周期。
"""
# 在大多数情况下,如果是简单的播放,SDK 或底层音频系统会自动处理。
# 如果需要对音频数据进行处理(如转录、分析),可以使用 AudioStream 读取 PCM 数据。
# 本演示主要聚焦于视频渲染逻辑,因此此处仅作为占位符。
pass
def _get_device_selection(self, devices: list[dict], device_type: str) -> Optional[int]:
"""
在控制台显示设备列表并让用户进行交互式选择。
返回所选设备的索引,如果跳过则返回 None。
"""
print(f"\nAvailable {device_type} devices:")
for d in devices:
print(f" [{d['index']}] {d['name']}")
while True:
choice = input(f"Select {device_type} device index (or press Enter to skip): ").strip()
if not choice:
return None
try:
idx = int(choice)
if any(d['index'] == idx for d in devices):
return idx
print("Invalid index. Please try again.")
except ValueError:
print("Invalid input. Please enter a number or press Enter.")
async def _setup_devices(self):
print("\n--- Device Selection ---")
media_devices = rtc.MediaDevices()
# 获取并选择音频输入设备 (麦克风)
input_devices = media_devices.list_input_devices()
self.mic_index = self._get_device_selection(input_devices, "Microphone")
# 获取并选择音频输出设备 (扬声器)
output_devices = media_devices.list_output_devices()
self.speaker_index = self._get_device_selection(output_devices, "Speaker")
# 选择视频输入设备 (摄像头)
cam_choice = input("\nSelect Camera Index (0, 1...) or Enter to skip: ").strip()
if cam_choice:
try:
self.cam_index = int(cam_choice)
except ValueError:
logger.warning("Invalid camera index, skipping video.")
async def _connect_room(self):
if not self.url:
logger.error("LIVEKIT_URL is not set")
sys.exit(1)
# 如果没有提供 Token 但提供了 API Key/Secret,则自动生成 Token
if not self.token:
if not self.api_key or not self.api_secret:
logger.error("必须设置 LIVEKIT_TOKEN 或 (LIVEKIT_API_KEY 和 LIVEKIT_API_SECRET)")
sys.exit(1)
logger.info("正在生成访问令牌 (Access Token)...")
# 创建 AccessToken,包含用户身份、名称以及房间权限 (Grants)
# 在生产环境中,Token 通常由后端服务器生成并分发给客户端
self.token = (
api.AccessToken(self.api_key, self.api_secret)
.with_identity("python-client-demo")
.with_name("Python Demo User")
.with_grants(
api.VideoGrants(
room_join=True,
room="client-demo-room",
)
)
.to_jwt()
)
logger.info(f"Connecting to {self.url}...")
try:
await self.room.connect(self.url, self.token)
logger.info(f"Connected to room: {self.room.name}")
logger.info(f"My Identity: {self.room.local_participant.identity}")
except Exception as e:
logger.error(f"Failed to connect: {e}")
sys.exit(1)
async def _publish_tracks(self):
# 发布音频
if self.mic_index is not None:
try:
# 使用 SDK 提供的便捷方法创建本地音频轨道。
# 这里的 "mic" 是轨道名称。rtc.AudioCaptureOptions() 使用默认音频捕获设置。
# 在更复杂的场景中,你可能需要配置回声消除、降噪等参数。
audio_track = rtc.LocalAudioTrack.create_audio_track("mic", rtc.AudioCaptureOptions())
await self.room.local_participant.publish_track(audio_track)
self.audio_status["mic"] = True
logger.info("已发布麦克风音频轨道")
except Exception as e:
logger.error(f"发布音频失败: {e}")
# 发布视频
if self.cam_index is not None:
# 视频发布逻辑:
# 我们不使用 SDK 的默认视频捕获,而是使用自定义的 OpenCV 捕获 worker。
# 这是为了演示如何将处理过(或仅仅是读取)的 OpenCV 帧输入到 LiveKit 的 VideoSource 中。
# 具体逻辑在 run() 方法中通过启动 _video_capture_worker 来实现。
pass # 实际处理逻辑在 run() 方法中通过 _video_capture_worker 完成
async def _video_capture_worker(self, source: rtc.VideoSource, cap: cv2.VideoCapture):
"""
视频捕获工作协程:
负责不断从摄像头读取帧,转换格式,并推送给 LiveKit SDK 进行发送。
同时更新本地预览状态。
"""
logger.info("启动本地视频捕获 worker...")
while not self.stop_event.is_set():
# 使用 asyncio.to_thread 在独立线程中进行阻塞式的 OpenCV 读取操作,避免阻塞异步事件循环
ret, frame = await asyncio.to_thread(cap.read)
if not ret:
logger.warning("无法从摄像头读取画面")
await asyncio.sleep(1)
continue
# 格式转换:OpenCV 默认使用 BGR,而 LiveKit 需要 RGBA 格式
rgba_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
height, width, _ = rgba_frame.shape
# 创建 LiveKit 视频帧对象并推送到 VideoSource
# VideoSource 是自定义视频源的入口,将数据编码并通过网络发送
lk_frame = rtc.VideoFrame(width, height, rtc.VideoBufferType.RGBA, rgba_frame.tobytes())
source.capture_frame(lk_frame)
# 更新本地预览帧(保持 BGR 格式以便 OpenCV 显示)
self.video_state["local_frame"] = frame
# 控制帧率,~30fps (1/30 ≈ 0.033s)
await asyncio.sleep(0.033)
async def _video_render_worker(self, stream: rtc.VideoStream, participant_identity: str):
"""
视频渲染工作协程:
负责消耗 VideoStream 中的视频帧,将其转换为 OpenCV 可用的格式并更新共享状态。
"""
logger.info(f"开始渲染用户 {participant_identity} 的视频")
try:
# stream 是一个异步迭代器,不断产生 VideoFrameEvent
async for frame_event in stream:
if self.stop_event.is_set():
break
frame = frame_event.frame
# 确保帧格式为 RGBA,如果不是则进行转换
# 这是因为接收到的帧可能是 I420 等其他 YUV 格式
if frame.type != rtc.VideoBufferType.RGBA:
frame = frame.convert(rtc.VideoBufferType.RGBA)
# 将字节数据转换为 Numpy 数组
arr = np.frombuffer(frame.data, dtype=np.uint8).reshape(frame.height, frame.width, 4)
# 将 RGBA 转换回 BGR 以便 OpenCV `imshow` 正确显示
bgr_frame = cv2.cvtColor(arr, cv2.COLOR_RGBA2BGR)
# 更新共享状态供 UI 线程读取显示
self.video_state["remote_frame"] = bgr_frame
self.video_state["remote_identity"] = participant_identity
except asyncio.CancelledError:
logger.info(f"用户 {participant_identity} 的渲染任务已取消")
except Exception as e:
logger.error(f"渲染用户 {participant_identity} 视频时发生错误: {e}")
async def _ui_worker(self):
logger.info("启动 UI worker...")
window_name = "LiveKit Client Demo"
cv2.namedWindow(window_name, cv2.WINDOW_AUTOSIZE)
while not self.stop_event.is_set():
remote_frame = self.video_state["remote_frame"]
local_frame = self.video_state["local_frame"]
display_frame = None
if remote_frame is not None:
# 情况 1: 有远程视频。将远程视频作为主画面。
display_frame = remote_frame.copy()
# 画中画 (PiP) 逻辑: 如果有本地视频,将其叠加在右下角
if local_frame is not None:
rh, rw, _ = display_frame.shape
lh, lw, _ = local_frame.shape
# 缩放本地画面至远程画面宽度的 25%
scale = 0.25
new_w = int(rw * scale)
# 保持宽高比计算高度
new_h = int(new_w * (lh / lw)) if lw > 0 else int(new_w * 0.75)
resized_local = cv2.resize(local_frame, (new_w, new_h))
# 计算位置: 右下角,带 20px 边距
padding = 20
y_off = rh - new_h - padding
x_off = rw - new_w - padding
# 确保叠加位置在画面范围内
if y_off >= 0 and x_off >= 0:
display_frame[y_off:y_off+new_h, x_off:x_off+new_w] = resized_local
elif local_frame is not None:
# 情况 2: 只有本地视频 (未连接他人或他人未开启视频)
display_frame = local_frame.copy()
else:
# 情况 3: 无视频信号,显示等待提示 (黑色背景)
display_frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(display_frame, "等待视频信号...", (50, 240),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.imshow(window_name, display_frame)
# 退出检测
# 按 'q' 键退出循环。waitKey(30) 等待 30ms 键盘输入,同时让窗口有机会刷新。
if cv2.waitKey(30) & 0xFF == ord('q'):
logger.info("检测到 UI 退出指令")
self.stop_event.set()
break
await asyncio.sleep(0.01)
cv2.destroyAllWindows()
async def run(self):
try:
# 1. 设备设置:选择麦克风、扬声器和摄像头
await self._setup_devices()
# 2. 连接房间:建立与 LiveKit 服务器的 WebSocket 连接
await self._connect_room()
# 3. 发布音频:简单的麦克风采集发布
await self._publish_tracks()
# 4. 发布视频并启动后台任务:
# 初始化摄像头,创建 VideoSource,并启动负责视频采集和 UI 渲染的后台协程
tasks = []
# UI 任务:负责 OpenCV 窗口的显示与刷新
tasks.append(asyncio.create_task(self._ui_worker()))
# 视频捕获任务 (如果选择了摄像头)
cap = None
if self.cam_index is not None:
cap = cv2.VideoCapture(self.cam_index)
if cap.isOpened():
ret, frame = cap.read()
if ret:
h, w, _ = frame.shape
video_source = rtc.VideoSource(w, h)
track = rtc.LocalVideoTrack.create_video_track("camera", video_source)
await self.room.local_participant.publish_track(track)
logger.info(f"Published camera track {w}x{h}")
tasks.append(asyncio.create_task(self._video_capture_worker(video_source, cap)))
else:
logger.error("Failed to read from selected camera.")
else:
logger.error(f"Failed to open camera index {self.cam_index}")
print("\n--- 按 Ctrl+C 或在窗口中按 'q' 退出 ---")
# 等待停止事件触发 (由 UI worker 或 KeyboardInterrupt 触发)
await self.stop_event.wait()
# 清理资源
logger.info("正在关闭...")
for t in tasks:
t.cancel()
# 等待任务清理完成
await asyncio.wait(tasks, timeout=1.0)
if cap:
cap.release()
await self.room.disconnect()
except asyncio.CancelledError:
pass
except KeyboardInterrupt:
self.stop_event.set()
finally:
if not self.stop_event.is_set():
self.stop_event.set()
await self.room.disconnect()
if __name__ == "__main__":
# 检查 OpenCV 依赖是否安装
try:
import cv2
except ImportError:
print("Error: opencv-python is required. Please install: pip3 install opencv-python")
sys.exit(1)
app = LiveKitClientApp()
try:
asyncio.run(app.run())
except KeyboardInterrupt:
pass原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。