
内容来源:程度员老廖
项目 | 要求 |
|---|---|
操作系统 | Linux64位(推荐Ubuntu20.04+/CentOS7+) |
编译器 | g++支持C++11 |
依赖库 | protobuf >= 3.19.4, tinyxml |
构建工具 | make或cmake |
项目自带DevContainer配置,最简单的方式:
# 如果使用VS Code/Cursor,直接打开项目,选择"Reopen in Container"
# DevContainer会自动安装所有依赖项目源码领取:C++校招项目推荐:高性能协程+RPC项目,一个项目打通后端8大核心技术
第一步:安装protobuf
# 下载protobuf 3.19.4
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protobuf-cpp-3.19.4.tar.gz
tar xzf protobuf-cpp-3.19.4.tar.gz
cd protobuf-3.19.4
./configure
make -j$(nproc)
sudo make install
sudo ldconfig第二步:安装tinyxml
# 方式一:通过git克隆(推荐)
git clone git://git.code.sf.net/p/tinyxml/git tinyxml
cd tinyxml
# 编译所有源文件
make -j4
# 用ar命令将.o文件打包成静态库
ar cr libtinyxml.a *.o
# 安装库文件和头文件到系统路径
sudo cp libtinyxml.a /usr/lib/
sudo mkdir -p /usr/include/tinyxml
sudo cp *.h /usr/include/tinyxml
cd ..第三步:编译TinyRPC
cd tinyrpc
mkdir -p bin lib obj
# 生成protobuf桩文件
cd testcases
protoc --cpp_out=./ test_tinypb_server.proto
cd ..
# 编译(二选一)
# 方式一:makefile
make -j4
sudo make install
# 方式二:cmake
mkdir build
sudo ./build.sh第四步:启动服务验证
# 启动TinyPB RPC服务
cd bin
nohup ./test_tinypb_server ../conf/test_tinypb_server.xml &
# 验证服务是否启动
ps -elf | grep test_tinypb_server
netstat -tln | grep 20000
# 使用客户端访问服务端
./test_tinypb_server_client
# 期望输出:Send to tinyrpc server 127.0.0.1:20000, requeset body:
# 期望输出:Success get response frrom tinyrpc server 127.0.0.1:20000, response body: res_info: "OK" age: 100100111
# 启动HTTP服务
nohup ./test_http_server ../conf/test_http_server.xml &
# 测试HTTP接口
curl -X GET 'http://127.0.0.1:19999/qps?id=1'
# 期望输出:QPSHttpServlet Echo Success!! Your id is,1问题 | 可能原因 | 解决方案 |
|---|---|---|
编译报错找不到protobuf | protobuf未正确安装 | 检查/usr/include/google/protobuf和/usr/lib/libprotobuf.a |
启动后端口未监听 | 配置文件路径错误 | 检查xml配置中的ip和port |
curl测试无响应 | 服务未正确启动 | 查看nohup.out日志排查 |
coredump | 协程栈溢出 | 增大配置中的coroutine_stack_size |
# 安装wrk压测工具
git clone https://github.com/wg/wrk.git
cd wrk
make -j$(nproc)
sudo cp wrk /usr/local/bin/# 确保HTTP服务已启动且关闭日志(提升性能)
# 在xml配置中设置rpc_log_level为NONE
# 基础测试:1000并发,持续30秒
wrk -c 1000 -t 8 -d 30 --latency 'http://127.0.0.1:19999/qps?id=1'
# 高压测试:5000并发
wrk -c 5000 -t 8 -d 30 --latency 'http://127.0.0.1:19999/qps?id=1'
# 极限测试:10000并发
wrk -c 10000 -t 8 -d 30 --latency 'http://127.0.0.1:19999/qps?id=1'修改配置文件conf/test_http_server.xml的iothread_num分别为1、4、8、16,重启服务后分别测试。
注意:性能数据与机器配置强相关,以下是两组不同环境下的实测数据。
原作者参考结果(CentOS虚拟机,4核6G,日志关闭):
IO线程数 | 1000并发 | 2000并发 | 5000并发 | 10000并发 |
|---|---|---|---|---|
1 | 27K QPS | 26K QPS | 20K QPS | 20K QPS |
4 | 140K QPS | 130K QPS | 123K QPS | 118K QPS |
8 | 135K QPS | 120K QPS | 100K QPS | 100K QPS |
16 | 125K QPS | 127K QPS | 123K QPS | 118K QPS |
实测结果(Ubuntu云服务器,AMD EPYC 7K83 8核15G,日志关闭,4个IO线程):
并发连接数 | QPS | 平均延迟 | P99延迟 |
|---|---|---|---|
1000 | 82K QPS | 20.76ms | 484ms |
2000 | 82K QPS | 13.41ms | 43ms |
5000 | 82K QPS | 10.08ms | 38ms |
10000 | 79K QPS | 12.45ms | 54ms |
开启DEBUG日志时同环境实测仅1.5W QPS,日志对性能影响约5倍。
为什么两台机器QPS差距这么大?
性能测试的绝对值跟CPU型号、虚拟化方案、内核版本、编译优化等因素都有关。原作者14W QPS是在CentOS裸虚拟机上测的,云服务器虽然核心数多但单核性能受虚拟化开销影响。做性能测试不要纠结绝对值,重要的是理解影响性能的因素和优化方向。
为什么开DEBUG日志后QPS暴跌5倍?
生活类比:你每炒一道菜(处理一个请求)都停下来写一页日记(写日志),炒菜速度当然暴跌。虽然是异步日志(先记脑子里再写),但DEBUG级别产生的日志量太大,光往buffer里塞字符串的CPU开销就很可观。生产环境一般设INFO或WARN级别。
为什么并发连接数从1000增到10000,QPS几乎不变?
这说明瓶颈不在连接管理上,而在CPU处理能力上。epoll管理1000个fd和10000个fd开销差别不大(红黑树O(logN)),真正限制QPS的是每个请求的处理耗时。这也是Reactor+协程架构的优势——连接数增加不会线性增加开销。
TinyRPC整体架构
┌───────────────────────────────────────────────────────────────┐
│ 应用层(用户代码) │
│ ┌───────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ HTTP Servlet │ │ RPC Service │ │ RPC Client │ │
│ └───────┬───────┘ └───────┬──────┘ └───────┬──────┘ │
├──────────┼──────────────────┼─────────────────┼───────────────┤
│ RPC调用封装层 │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ TinyPbRpcChannel / TinyPbRpcAsyncChannel │ │
│ └───────────────────────────────────────────────────────┘ │
├───────────────────────────────────────────────────────────────┤
│ 协议编解码层 │
│ ┌───────────────┐ ┌───────────────────┐ │
│ │ HTTP Codec │ │ TinyPB Codec │ │
│ └───────────────┘ └───────────────────┘ │
├───────────────────────────────────────────────────────────────┤
│ 网络传输层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ TcpServer / TcpConnection / TcpClient │ │
│ └─────────────────────────────────────────────────────┘ │
├──────────────────────────────────────────────────────────────┤
│ 核心引擎层 │
│ ┌───────────┐ ┌─────────────┐ ┌──────────────────┐ │
│ │ Reactor │ │ Coroutine │ │ Async Logger │ │
│ │ (epoll) │ │ (m:n模型) │ │ (生产者-消费者) │ │
│ └───────────┘ └─────────────┘ └──────────────────┘ │
└──────────────────────────────────────────────────────────────┘tinyrpc/
├── comm/ # 公共工具:日志、配置、线程池
│ ├── log.h/cc # 异步日志系统
│ ├── config.h/cc # XML配置解析
│ └── ...
├── coroutine/ # 协程模块(★核心难点★)
│ ├── coroutine.h/cc # 协程封装(Resume/Yield)
│ ├── coctx.h # 协程上下文(14个寄存器)
│ ├── coctx_swap.S # 汇编实现的上下文切换
│ ├── coroutine_hook.h/cc # 系统调用Hook
│ ├── coroutine_pool.h/cc # 协程池
│ └── memory.h/cc # 协程栈内存管理
├── net/ # 网络模块
│ ├── reactor.h/cc # Reactor事件循环
│ ├── fd_event.h/cc # 文件描述符事件封装
│ ├── timer.h/cc # 定时器
│ ├── tcp/ # TCP模块
│ │ ├── tcp_server.h/cc # TCP服务端
│ │ ├── tcp_connection.h/cc # TCP连接管理
│ │ ├── io_thread.h/cc # IO线程及线程池
│ │ └── ...
│ ├── http/ # HTTP协议模块
│ │ ├── http_codec.h/cc # HTTP编解码
│ │ ├── http_servlet.h/cc # Servlet接口
│ │ └── ...
│ └── tinypb/ # TinyPB自定义协议模块
│ ├── tinypb_codec.h/cc # TinyPB编解码
│ ├── tinypb_rpc_channel.h/cc # 阻塞式RPC调用
│ ├── tinypb_rpc_async_channel.h/cc # 非阻塞式RPC调用
│ └── ...
├── testcases/ # 测试用例
├── conf/ # 配置文件
└── generator/ # 代码生成脚手架先提出问题: 传统网络服务器处理并发连接有什么困难?
传统方案一:多线程/多进程模型
生活类比:一个餐厅来了10000个客人,为每个客人专门请一个服务员。问题是:
传统方案二:异步回调模型(如Node.js)
生活类比:只请一个超级服务员,他用便签纸记录:"3号桌的菜好了就端过去,5号桌要加水"。问题是:
协程方案:用同步的代码,达到异步的性能!
生活类比:请4个服务员(IO线程),但每个服务员都会"分身术"(协程)。当他在等3号桌的菜时(IO等待),他的分身会自动去服务5号桌。菜好了他的分身自动回来继续3号桌的服务。从每桌客人的角度看,服务员一直在服务自己(同步体验);从餐厅老板的角度看,4个人干了10000个人的活(异步性能)。
先提出问题: 所谓"分身术"到底是怎么实现的?CPU怎么知道下次从哪里继续执行?
答案: 保存和恢复CPU寄存器的状态。
函数执行到一半时,CPU的状态完全由几个关键寄存器决定:
TinyRPC使用14个寄存器来完整保存协程状态:
// tinyrpc/coroutine/coctx.h
struct coctx {
void* regs[14]; // 14个寄存器的"快照"
};
enum {
kRBP = 6, // 栈底指针
kRDI = 7, // 第一个参数
kRSI = 8, // 第二个参数
kRETAddr = 9, // 返回地址(即下次从哪继续执行)
kRSP = 13, // 栈顶指针
};生活类比:你正在看一本书(执行协程A),看到第50页时需要去做饭(切换到协程B)。你会夹一个书签在第50页(保存寄存器状态)。做完饭回来,翻到书签那一页继续看(恢复寄存器状态)。coctx就是那个书签,14个regs就是书签上记录的14个关键信息。
深入理解:regs[14]的内存布局
汇编代码里的偏移量(104、48、72...)都是从这个数组算出来的:
regs[14] 数组内存布局(每格8字节,总共112字节)
┌──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┐
│ regs[0] │ regs[1] │ regs[2] │ regs[3] │ regs[4] │ regs[5] │ regs[6] │
│ r15 │ r14 │ r13 │ r12 │ r9 │ r8 │ rbp │
│ 偏移 0 │ 偏移 8 │ 偏移 16 │ 偏移 24 │ 偏移 32 │ 偏移 40 │ 偏移 48 │
├──────────┼──────────┼──────────┼──────────┼──────────┼──────────┼──────────┤
│ regs[7] │ regs[8] │ regs[9] │ regs[10] │ regs[11] │ regs[12] │ regs[13] │
│ rdi │ rsi │ retAddr │ rdx │ rcx │ rbx │ rsp │
│ 偏移 56 │ 偏移 64 │ 偏移 72 │ 偏移 80 │ 偏移 88 │ 偏移 96 │ 偏移 104 │
└──────────┴──────────┴──────────┴──────────┴──────────┴──────────┴──────────┘
汇编里 "movq %rax, 104(%rdi)" → 104 = kRSP(13) × 8
汇编里 "movq %rbp, 48(%rdi)" → 48 = kRBP(6) × 8
所有偏移量都是 数组下标 × 8字节 算出来的,没有魔法。深入理解:为什么是这14个寄存器?
x86-64有16个通用寄存器,但不是每个都需要保存。根据System V AMD64 ABI(Linux函数调用约定):
类别 | 寄存器 | 个数 | 为什么要保存 |
|---|---|---|---|
Callee-saved | rbx, rbp, r12-r15 | 6 | ABI规定函数返回时这些值必须不变。协程切换相当于"假装函数返回了",不保存这6个,上层代码直接崩 |
栈和指令指针 | rsp, retAddr(rip) | 2 | rsp决定"在哪个栈上执行",retAddr决定"从哪行代码继续"。缺一个都无法恢复 |
参数/临时 | rdi, rsi, rdx, rcx, r8, r9 | 6 | 正常函数调用中这些是caller-saved。但协程不是正常调用——setCallBack要通过regs[kRDI]传this指针给CoFunction,且切换可能发生在任意时刻,不保存会导致恢复后寄存器残留别人的值 |
不需要保存 | rax, r10, r11 | -- | rax是返回值/临时变量(汇编里直接当暂存器用了),r10/r11是纯scratch寄存器,切换完会被重新赋值 |
最终:6 + 2 + 6 = 14个寄存器,每个8字节,一个协程上下文仅112字节。对比线程切换要保存的内核栈、浮点寄存器、TLB状态——量级差了两个数量级。
; tinyrpc/coroutine/coctx_swap.S——来自腾讯libco
coctx_swap:
; ===== 第一阶段:保存当前协程的寄存器 =====
; rdi指向当前协程的coctx(第一个参数)
leaq (%rsp),%rax ; 把当前栈顶地址存到rax
movq %rax, 104(%rdi) ; 保存rsp(regs[13])
movq %rbx, 96(%rdi) ; 保存rbx(regs[12])
movq %rcx, 88(%rdi) ; 保存rcx(regs[11])
; ... 保存其他寄存器 ...
movq %rbp, 48(%rdi) ; 保存rbp(regs[6])
; ===== 第二阶段:恢复目标协程的寄存器 =====
; rsi指向目标协程的coctx(第二个参数)
movq 48(%rsi), %rbp ; 恢复rbp
movq 104(%rsi), %rsp ; 恢复rsp(栈切换!)
; ... 恢复其他寄存器 ...
leaq 8(%rsp), %rsp ; 调整栈顶
pushq 72(%rsi) ; 把返回地址压栈
movq 64(%rsi), %rsi ; 恢复rsi
ret ; ret指令会跳转到栈顶地址执行生活类比:两个人在同一台电脑上轮流玩游戏。玩家A把游戏存档(保存寄存器),然后玩家B读档继续玩(恢复寄存器)。coctx_swap就是"存档+读档"这两步操作的原子过程。
关键点:为什么只需要约38行汇编?
因为协程切换是用户态操作,不需要陷入内核。相比线程切换需要:保存/恢复所有寄存器→切换内核栈→刷新TLB→切换地址空间,协程切换只需要保存/恢复14个寄存器,快了几个数量级。
深入理解:恢复阶段最后三步(最精妙的部分)
movq 104(%rsi), %rsp恢复栈指针后,rsp指向目标协程的栈。此时栈顶放的是上次call coctx_swap压入的旧返回地址,但我们要用coctx里保存的返回地址(regs[9])来替换它:
第①步:leaq 8(%rsp), %rsp
rsp += 8,跳过栈上旧的返回地址(我们不要它)
┌────────────────────────┐
│ ...上层栈帧... │ ← rsp 现在指向这里
├────────────────────────┤
│ 旧的返回地址(已跳过) │
└────────────────────────┘
第②步:pushq 72(%rsi)
72 = kRETAddr(9) × 8,即coctx里保存的返回地址
pushq = rsp减8,把值写到栈顶
┌────────────────────────┐
│ ...上层栈帧... │
├────────────────────────┤
│ regs[9]的返回地址 │ ← rsp 指向这里(新压入的)
└────────────────────────┘
第③步:ret
ret = pop栈顶 → rip,CPU跳转到regs[9]保存的地址继续执行
也就是目标协程上次Yield时的下一条指令
效果:用coctx里的返回地址替换了栈上的旧地址,ret跳转过去。切换完成。为什么用leaq+pushq而不是直接覆盖栈顶?因为x86的mov不支持内存到内存操作,直接覆盖需要一个额外寄存器做中转。而leaq+pushq只用rsp自己就完成了,不污染任何其他寄存器——手工汇编中"能少用一个寄存器就少用一个"。
Resume(唤醒): 主协程 → 目标协程 "老板说:你去干活"
Yield(让出): 目标协程 → 主协程 "打工人说:我干不动了,先歇会"核心代码解析:
// 从主协程切换到目标协程
void Coroutine::Resume(Coroutine* co) {
// 安全检查:只有主协程才能Resume其他协程
if (t_cur_coroutine != t_main_coroutine) {
return; // 必须由"老板"来分配任务
}
t_cur_coroutine = co; // 标记当前执行的协程
coctx_swap(&(t_main_coroutine->m_coctx), &(co->m_coctx));
// ↑ 保存主协程状态 ↑ 恢复目标协程状态
}
// 从目标协程切换回主协程
void Coroutine::Yield() {
Coroutine* co = t_cur_coroutine;
t_cur_coroutine = t_main_coroutine; // 切回主协程
coctx_swap(&(co->m_coctx), &(t_main_coroutine->m_coctx));
// ↑ 保存当前协程状态 ↑ 恢复主协程状态
}生活类比:一个老板(主协程)管理多个员工(子协程)。
先提出问题: 如果用户代码里写了read(fd, buf, count),这是阻塞调用,会卡住整个线程,怎么办?
答案: Hook(钩子)——偷梁换柱,把系统调用替换成协程版本。
// 原理:利用dlsym获取原始系统调用的地址
#define HOOK_SYS_FUNC(name) \
name##_fun_ptr_t g_sys_##name##_fun = (name##_fun_ptr_t)dlsym(RTLD_NEXT, #name);
HOOK_SYS_FUNC(read); // g_sys_read_fun指向真正的read系统调用
HOOK_SYS_FUNC(write);
HOOK_SYS_FUNC(connect);
HOOK_SYS_FUNC(accept);
HOOK_SYS_FUNC(sleep);Hook版的read完整流程(7步):
// tinyrpc/coroutine/coroutine_hook.cc 第66-108行
ssize_t read_hook(int fd, void *buf, size_t count) {
// ① 主协程检查——主协程不能Yield,直接走原版
if (Coroutine::IsMainCoroutine()) return g_sys_read_fun(fd, buf, count);
// ② 设为非阻塞——Hook的前提是非阻塞IO
fd_event->setNonBlock();
// ③ 先尝试直接读
ssize_t n = g_sys_read_fun(fd, buf, count);
if (n > 0) return n; // 有数据就直接返回
// ④ 没数据(EAGAIN),注册epoll监听
toEpoll(fd_event, READ);
// ⑤ 让出协程,切回主协程
Coroutine::Yield();
// ⑥ 被Resume唤醒,取消epoll监听
fd_event->delListenEvents(READ);
fd_event->clearCoroutine();
// ⑦ 真正读数据
return g_sys_read_fun(fd, buf, count);
}关键细节:
生活类比:你去银行办业务(read),发现柜台没人(数据未就绪)。
端到端时序图:从用户调read()到数据返回
用户代码 read_hook() epoll/Reactor 网络
│ │ │ │
│ read(fd,buf,n) │ │ │
│─────────────────────→ │ │ │
│ │ setNonBlock │ │
│ │ g_sys_read_fun() │ │
│ │ 返回EAGAIN │ │
│ │ │ │
│ │ toEpoll(fd,READ) ──→ │ 注册fd可读监听 │
│ │ Coroutine::Yield()──→ │ │
│ │ ┌────────────────┐ │ │
│ │ │ 协程A被挂起 │ │ │
│ │ │ CPU去处理 │ │ │
│ │ │ 其他协程B,C.. │ │ │
│ │ └────────────────┘ │ │
│ │ │ ←── 数据到达 ──│
│ │ │ epoll_wait返回 │
│ │ ←──── Resume(协程A) ──────│ │
│ │ delListenEvents │ │
│ │ g_sys_read_fun() ──────→ │ │
│ │ 返回n字节 │ │
│ ←───── 返回n ─────── │ │ │
│ 继续执行... │ │ │
用户代码只看到:调了read()→拿到了数据。完全感知不到中间的Yield/Resume。connect_hook——比read更复杂的场景
connect_hook需要额外处理超时:非阻塞connect返回EINPROGRESS后,有两种情况可能唤醒协程:
// tinyrpc/coroutine/coroutine_hook.cc 第191-263行(简化)
int connect_hook(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
fd_event->setNonBlock();
int n = g_sys_connect_fun(sockfd, addr, addrlen);
if (n == 0) return 0; // 立即连上
if (errno != EINPROGRESS) return n; // 真的失败
toEpoll(fd_event, WRITE); // 连接完成时fd变为可写
// 同时注册超时定时器
bool is_timeout = false;
auto timeout_cb = [&is_timeout, cur_cor]() {
is_timeout = true;
Coroutine::Resume(cur_cor); // 超时也唤醒协程
};
timer->addTimerEvent(timeout_event);
Coroutine::Yield(); // 等连接完成或超时
// 唤醒后用is_timeout区分原因
timer->delTimerEvent(timeout_event); // 清理定时器
if (is_timeout) { errno = ETIMEDOUT; return -1; }
return 0;
}精妙之处:epoll回调(连接成功)和定时器回调(超时)都可能Resume同一个协程,用is_timeout标志位区分唤醒原因。唤醒后必须同时清理epoll事件和定时器,否则可能被二次唤醒。
先提出问题: 1:n模型(一个线程对应多个协程)有什么缺陷?
答案: 如果某个协程的业务逻辑特别耗时(比如复杂计算),同一线程上的其他协程都得等它。
生活类比:1:n模型=一个快递员负责一整栋楼。如果他被10楼的大件快递耽误了,1-9楼全部得等。 m:n模型=4个快递员共同负责一整栋楼。10楼的大件耽误了一个快递员,其他3个继续送1-9楼。
1:n vs m:n的直观对比:
======= 1:n模型 =======
IO线程1: [epoll] → Resume(C1) → Resume(C2) → Resume(C3)
↑ C3很慢,C1/C2被堵
IO线程2: [epoll] → Resume(C4) ← 很闲,帮不上忙
IO线程3: [epoll] → Resume(C5) ← 很闲,帮不上忙
问题:C3阻塞了IO线程1上的所有协程,线程2和3干瞪眼。
======= m:n模型 =======
全局队列: [C1, C2, C3, C4, C5]
IO线程1: pop→C1→处理完→pop→C3→C3很慢→Yield→C3回到队列
IO线程2: pop→C2→处理完→pop→C4→处理完→pop→C3→继续C3的工作
IO线程3: pop→C5→处理完→队列空了→epoll_wait等新事件
结果:C3很慢?没关系,它Yield后回到队列,
IO线程2有空了就接着处理。没有任何线程被堵死。TinyRPC的实现:
// 全局协程任务队列,所有IO线程共享
// tinyrpc/net/reactor.h 第124-135行
class CoroutineTaskQueue {
std::queue<FdEvent*> m_task;
Mutex m_mutex; // 多线程共享,必须加锁
};
// Reactor事件循环中,SubReactor从全局队列取协程执行
// tinyrpc/net/reactor.cc 第233-244行
if (m_reactor_type != MainReactor) {
while (1) {
FdEvent* ptr = CoroutineTaskQueue::GetCoroutineTaskQueue()->pop();
if (ptr) {
ptr->setReactor(this); // ★ 关键:更新Reactor指针
Coroutine::Resume(ptr->getCoroutine());
} else {
break;
}
}
}关键细节:ptr->setReactor(this)
协程C上次在IO线程1上执行,现在被IO线程2取出来了。setReactor(this)把fd关联的Reactor更新为IO线程2的。如果不更新,下次read_hook注册epoll时会注册到IO线程1的Reactor上。IO线程1收到事件后Resume协程C——但IO线程2可能还在操作协程C的数据。同一个协程被两个线程同时操作 = 数据竞争 = coredump。
first_coroutine优化:减少锁争用
// tinyrpc/net/reactor.cc 第294-304行
if (ptr->getCoroutine()) {
if (!first_coroutine) {
first_coroutine = ptr->getCoroutine(); // 第一个就绪协程不入队
continue;
}
CoroutineTaskQueue::GetCoroutineTaskQueue()->push(ptr); // 其余入全局队列
}
// 循环开头直接Resume
if (first_coroutine) {
Coroutine::Resume(first_coroutine); // 省掉push+pop各一次锁操作
first_coroutine = NULL;
}每次epoll_wait返回可能有多个就绪事件。第一个直接在当前线程Resume,省掉两次锁操作。高并发下每秒几十万次协程调度,这个优化的累积效果很可观。
注意事项:m:n模型下,同一个协程可能被不同IO线程Resume,因此:
先提出问题: 协程的创建过程(分配栈内存、初始化coctx、设回调)看起来不复杂,为什么要搞一个池子?
不用池子时,每个请求的开销:
每个请求到来:
① 分配128KB栈内存 ← 内核需要建立虚拟地址映射
② new Coroutine对象 ← 堆分配
③ setCallBack初始化coctx ← 纯计算,很快
④ 执行业务...
请求处理完:
⑤ 释放128KB栈内存 ← 归还内存
⑥ delete Coroutine对象 ← 堆释放
14万QPS = 每秒执行14万次 ①②⑤⑥ = CPU全在做内存管理真正的性能杀手:缺页中断(Page Fault)
不管用malloc还是mmap分配内存,Linux的按需分配(Demand Paging)策略都一样:内核只给虚拟地址,物理内存在首次写入时才分配。每次写入未映射的页面触发一次缺页中断,陷入内核态分配物理页。
128KB栈 = 32个4KB页面。一个新协程最坏触发32次缺页中断。14万QPS × 最多32次 = 每秒最多448万次缺页中断,CPU基本在内核态打转。
用了池子之后:
初始化阶段(只做一次):
Memory一次性分配 pool_size × 128KB
第一轮使用触发缺页,之后物理页就一直在了
运行阶段(每个请求):
① getCoroutineInstanse() 从数组取一个 ← O(N)遍历,纯用户态
② setCallBack重置coctx ← 纯计算
③ 执行业务...
④ returnCoroutine() 标记为空闲 ← 改一个bool
没有内存分配/释放,没有缺页中断源码实现:
// tinyrpc/coroutine/coroutine_pool.cc 第46-73行
Coroutine::ptr CoroutinePool::getCoroutineInstanse() {
Mutex::Lock lock(m_mutex); // m:n模型下多线程共享,必须加锁
for (int i = 0; i < m_pool_size; ++i) {
if (!m_free_cors[i].first->getIsInCoFunc() && !m_free_cors[i].second) {
m_free_cors[i].second = true;
Coroutine::ptr cor = m_free_cors[i].first;
lock.unlock();
return cor;
}
}
// 池中没有可用协程,扩容
m_memory_pool.push_back(std::make_shared<Memory>(m_stack_size, m_pool_size));
return std::make_shared<Coroutine>(m_stack_size, m_memory_pool.back()->getBlock());
}为什么优先复用"用过的"协程? 遍历顺序从i=0开始,先被使用过的协程排在前面。它们的栈内存已经被写入过,物理页早就分配好了,是"热"内存。后面的可能从未被使用,虚拟地址有了但物理页还没有,一用就触发缺页中断。
关于栈内存分配方式: 当前代码使用malloc(memory.cc第15行)。原版libco使用mmap,好处是可以配合mprotect在栈底设guard page——栈溢出时触发SIGSEGV而不是默默踩坏别人的内存。当前用malloc做不到这一点,这是一个可优化的方向。
生活类比:共享单车站(协程池)。需要骑车时从站点拿一辆(getCoroutineInstanse),用完还回去(returnCoroutine),而不是每次都买新的再扔掉。优先拿"热车"(最近用过的,物理内存已分配),而不是"冷车"(从未使用,首次骑要触发缺页分配物理页)。
先提出问题: 一个服务端需要同时监听成千上万个连接的IO事件,怎么高效处理?
答案: Reactor模式——一个"调度中心"统一监听所有事件,事件就绪后分发给对应的处理器。
生活类比:医院的导诊台(Reactor)。所有病人(连接)到了先去导诊台登记,导诊台统一安排:"3号去内科,5号去外科"。而不是让每个科室的医生自己出来招揽病人。
┌──────────────────┐
│ MainReactor │
│ (主线程) │
│ 职责:accept │
│ 新连接 │
└─────────┬────────┘
│ 分发新连接
┌───────────────┼──────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ SubReactor │ │ SubReactor │ │ SubReactor │
│ IO线程1 │ │ IO线程2 │ │ IO线程3 │
│ epoll_wait │ │ epoll_wait │ │ epoll_wait │
│ 处理读写 │ │ 处理读写 │ │ 处理读写 │
└────────────┘ └────────────┘ └────────────┘
↕ ↕ ↕
┌─────────────────────────────────────────────────┐
│ 全局协程任务队列(加锁) │
│ 协程A 协程B 协程C 协程D ... │
└─────────────────────────────────────────────────┘Reactor::loop()是整个框架的调度引擎(tinyrpc/net/reactor.cc第210-367行),每次循环做5件事:
Reactor::loop() 单次循环:
① 从全局队列pop就绪协程并Resume(仅SubReactor)
→ 配合m:n模型,从CoroutineTaskQueue取其他线程投递的协程
→ 注意:pop后要setReactor(this)更新Reactor指针
② 执行pending_tasks
→ 其他线程投递的异步任务(如注册新fd),先放进队列,这里统一执行
③ epoll_wait等待事件
→ 阻塞直到有IO事件就绪、定时器到期、或被wakeup唤醒
④ 事件分发
→ wakeup fd → 读掉eventfd的数据
→ timer fd → 直接执行定时回调
→ 有关联协程 → 第一个直接Resume(first_coroutine优化),后续入全局队列
→ 有回调函数 → 放入pending_tasks
⑤ 处理跨线程投递的fd添加/删除
→ 将pending_add_fds和pending_del_fds统一应用到epoll这5步的执行顺序不能随意调换:先处理全局队列(可能有其他线程投递的紧急任务)→再执行pending任务→然后才epoll_wait等新事件→处理新事件后更新epoll状态。
// 创建(tinyrpc/net/reactor.cc 第52行)
m_wake_fd = eventfd(0, EFD_NONBLOCK);
// 唤醒(tinyrpc/net/reactor.cc 第122-133行)
void Reactor::wakeup() {
if (!m_is_looping) return;
uint64_t tmp = 1;
g_sys_write_fun(m_wake_fd, &tmp, 8); // 往eventfd写个1
}生活类比:你在午睡(epoll_wait阻塞),手机闹钟响了(wakeup),你醒来看看有什么新任务。eventfd就是那个闹钟——跨线程安全地唤醒正在epoll_wait的线程。
为什么用eventfd而不是pipe? pipe需要两个fd(一读一写),eventfd只需要一个,更节约资源。eventfd还支持信号量语义(EFD_SEMAPHORE),比pipe更适合这种"通知"场景。
1. 创建IOThreadPool(N个IO线程,每个线程一个SubReactor)
2. 创建MainReactor
3. 创建accept协程,绑定MainAcceptCorFunc
4. Resume accept协程 → 开始监听新连接
5. 启动所有IO线程(sem_post)
6. MainReactor进入loop
MainAcceptCorFunc循环:
└→ accept(hook版)→ 无新连接则Yield
└→ 有新连接 → 创建TcpConnection → 分配到某个IO线程
└→ 将连接的协程加入该IO线程的ReactorTinyRPC使用时间轮来管理空闲连接的超时清理。
生活类比:停车场按小时收费。每过一个小时,管理员检查最外圈的停车位,超时的车就被拖走(关闭连接)。新来的车或者续费的车会被放到最内圈(刷新计时)。这比给每辆车设置一个独立闹钟(每个连接一个定时器)要高效得多。
┌────────┬────────┬──────────────┬──────────────┬──────────────────┬──────────┬─────────────┬──────────┬──────────┬────────────┬──────┐
│ start │ pk_len │ msg_req_len │ msg_req │ service_name_len │ service │ err_code │ err_info │ pb_data │ check_num │ end │
│ 0x02 │ 4B │ 4B │ 变长 │ 4B │ 变长 │ 4B │ 变长 │ 变长 │ 4B │0x03 │
└────────┴────────┴──────────────┴──────────────┴──────────────────┴──────────┴─────────────┴──────────┴──────────┴────────────┴──────┘最小包大小:1+4+4+4+4+4+4+1 = 26字节
对比项 | HTTP | TinyPB |
|---|---|---|
格式 | 文本协议 | 二进制协议 |
解析效率 | 需要解析文本Header | 直接按偏移量读取 |
包体大小 | Header较大 | 最小26字节 |
适用场景 | 浏览器/外部调用 | 内部服务间通信 |
生产者(多个IO线程) 消费者(日志线程)
│ │
├─ pushRpcLog(msg) ──→ buffer │
├─ pushAppLog(msg) ──→ buffer │
│ │ 定时flush
│ ├──→ 写入RPC日志文件
│ └──→ 写入APP日志文件特点:
难在哪里? 汇编级别的寄存器操作,任何一个寄存器保存/恢复的顺序错误,都会导致不可预测的coredump。
关键理解点:
1.栈对齐:x86-64 ABI要求栈指针16字节对齐,否则SSE指令会Segfault
top = reinterpret_cast<char*>((reinterpret_cast<unsigned long>(top)) & -16LL);2.首次Resume的巧妙设计:通过预设寄存器值,让coctx_swap的ret跳转到CoFunction
m_coctx.regs[kRETAddr] = reinterpret_cast<char*>(CoFunction);
m_coctx.regs[kRDI] = reinterpret_cast<char*>(this); // CoFunction的参数3.CoFunction结束后自动Yield:防止协程执行完毕后"跑飞"
void CoFunction(Coroutine* co) {
co->m_call_back(); // 执行用户回调
Coroutine::Yield(); // 回调结束后必须让出,否则CPU会执行到非法地址
}难在哪里? 需要在hook函数中正确地与epoll和协程交互,时序必须精确。
以connect_hook为例,要处理的边界情况:
// 超时回调和epoll回调都可能Resume协程
// 必须用is_timeout标志区分唤醒原因
auto timeout_cb = [&is_timeout, cur_cor]() {
is_timeout = true;
Coroutine::Resume(cur_cor);
};难在哪里? 同一个协程可能在线程A中Yield,在线程B中Resume。
必须注意的问题:
TinyRPC的优化: 第一个就绪协程直接在当前线程Resume,避免入队出队的锁开销
难在哪里? 跨线程、跨协程的对象,谁来负责释放?
典型场景:非阻塞RPC调用
// 这些对象在线程A创建,在线程B的新协程中使用
std::shared_ptr<queryAgeReq> rpc_req = std::make_shared<queryAgeReq>();
std::shared_ptr<queryAgeRes> rpc_res = std::make_shared<queryAgeRes>();
// 必须调用saveCallee预留引用计数!
// 否则rpc_req/rpc_res可能在函数返回后被析构,线程B访问就coredump
async_channel->saveCallee(rpc_controller, rpc_req, rpc_res, closure);原则: 所有跨协程/跨线程传递的对象,一律使用shared_ptr,并确保引用计数正确。
难在哪里? epoll事件、协程状态、定时器三者的交互逻辑复杂。
Reactor::loop的关键调度逻辑:
1.先处理全局队列中的就绪协程(可能来自其他线程)
2.执行pending_tasks(fd的添加/删除等)
3.epoll_wait等待事件
4.收到事件后:
5.处理跨线程投递的fd添加/删除
这个顺序不能随意调换,否则会出现:协程被错误唤醒、事件丢失、死锁等问题。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。