为了打造高性能的服务器,通常思路有两种:1.多cpu利用,并发执行,如多进程和多线程 2、提高每个cpu的使用率,让每个cpu高效起来。
传统的多进程方案,一个新的客户端,单独交给子进程处理,如:apache,伪代码
while(new_requst)
{
if(fork() != 0)
{
...
exit(0);
}
}i/o挂起和时间片轮询促使进程释放cpu,切换上下文进程,提高cpu的效率,随着客户端连接的不断增加,进程数相应增加,进程切换的代价导致系统的可用性降低。
多线程是多进程的改进版本,线程切换的代价比进程切换低很多,线程的共享全局数据区,共享数据的临界区访问,需要同步机制,同步机制降低性能,线程之间的共享数据区,一个线程会影响整个进程的运行。
注:进程和线程的调度切换可以参考 https://www.jianshu.com/p/91c8600cb2ae
多进程和多线程都是依赖操作系统的调用,当io出现阻塞的时候,将进程(线程)切换,另一个思路,出现i/o阻塞,进程内部自己去管理所有的任务,将阻塞任务挂起,调用可运行的任务。异步i/o和协程是两个比较通用的方案。
异步i/o通过状态回调事件,当i/o事件触发,通过注册的处理函数处理对应的事件,以epoll代码为例: n = epoll_wait();
for (i = 0; i < nfd; i++)
{
if(writable(epoll_data[i]))
{
write_handle(epoll_data[i]);
}
if(readable(epoll_data[i]))
{
read_handle(epoll_data[i]);
}
if(error(epoll_data[i]))
{
error_handle(epoll_data[i]);
}
}调度的核心在epoll事件,epoll事件注册回调,epoll事件的触发,调用回调信息,每个阶段处理的epoll注册的epoll回调不一样,类似状态处理机,需要记录当前处理状态(处理的函数),处理完成后切换到的下一个状态(另一个处理函数)。状态的管理比较复杂,代码的可读性不高。
协程有两种通用的实现方式:
1. glibc 实现 ucontext
2. C 语言的 setjmp 和 longjmp, state thread(st)采用jmp实现
两种实现的本质是一样的,协程通过保存当前运行的寄存器和栈,切换后恢复寄存器和栈,st和ucontext的不同之处st忽略了信号量的处理,st内部不会直接处理信号量信息,st建议通过pipe方式实现信号量的处理。(参考:http://state-threads.sourceforge.net/docs/notes.html#signals)
st是保障cpu的能一直跑任务,任务挂起后,调度下一个任务。如果没有可运行的任务,阻塞直到有新的i/o事件产生,触发新的任务。如cpu管理进程的思路,协程主要将任务分为4种状态:
1. running —— 正在执行的协程
2. runnable —— 可以执行的协程,等待被调度
3. sleep —— 需要等待i/o事件,在等待队列中
4. zombie —— 已经执行完毕的协程
st的状态调度如图:

dispatch的st的主调度模块,主要负责将io事件和超时事件转换为runnalbe task,源码:
void *_st_idle_thread_start(void *arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0) {
/* Idle vp till I/O is ready or the smallest timeout expired */
_ST_VP_IDLE();
/* Check sleep queue for expired threads */
_st_vp_check_clock();
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me);
}
/* No more threads */
exit(0);
/* NOTREACHED */
return NULL;
}
st_vp_idle是i/o事件,st可以选择select、poll、kqueue模型,本文主要介绍下epoll的实现,epoll的实现有几个重要环节:
1.timeout的获取,通过等待队列最小堆方式排序,获取堆顶元素,如果堆为空,则阻塞等待
2. wait i/o事件
3. 找到文件句柄对应的处理协程,将协程放入可执行队列内
ST_HIDDEN void _st_epoll_dispatch(void)
{
...
if (_ST_SLEEPQ == NULL) {
timeout = -1;
} else {
min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 :
(_ST_SLEEPQ->due - _ST_LAST_CLOCK);
timeout = (int) (min_timeout / 1000);
}
/* Check for I/O operations */
nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,
_st_epoll_data->evtlist_size, timeout);
if (nfd > 0) {
for (i = 0; i < nfd; i++) {
osfd = _st_epoll_data->evtlist[i].data.fd;
_ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events;
if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) {
/* Also set I/O bits on error */
_ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd);
}
}
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
...
if (notify) {
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Here we will only delete/modify descriptors that
* didn't fire (see comments in _st_epoll_pollset_del()).
*/
_st_epoll_pollset_del(pq->pds, pq->npds);
if (pq->thread->flags & _ST_FL_ON_SLEEPQ)
_ST_DEL_SLEEPQ(pq->thread);
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
}
}
for (i = 0; i < nfd; i++) {
/* Delete/modify descriptors that fired */
osfd = _st_epoll_data->evtlist[i].data.fd;
_ST_EPOLL_REVENTS(osfd) = 0;
events = _ST_EPOLL_EVENTS(osfd);
op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
ev.events = events;
ev.data.fd = osfd;
if (epoll_ctl(_st_epoll_data->epfd, op, osfd, &ev) == 0 &&
op == EPOLL_CTL_DEL) {
_st_epoll_data->evtlist_cnt--;
}
}
}
}
注:st的epoll实现效率不高,需要遍历所有的_ST_IOQ这个实现,这个逻辑违背了epoll的实现,退化为select模型 _st_vp_check_clock的实现比较巧妙,通过最小堆形式保存超时等待的任务,最早超时的在顶部,所以每次加入和删除超时队列,只需要做对应的heap调整,避免了全部排序,当出现超时将协程状态变为超时,加入可执行队列状态。源码:
void _st_vp_check_clock(void)
{
_st_thread_t *thread;
st_utime_t elapsed, now;
now = st_utime();
elapsed = now - _ST_LAST_CLOCK;
_ST_LAST_CLOCK = now;
if (_st_curr_time && now - _st_last_tset > 999000) {
_st_curr_time = time(NULL);
_st_last_tset = now;
}
while (_ST_SLEEPQ != NULL) {
thread = _ST_SLEEPQ;
ST_ASSERT(thread->flags & _ST_FL_ON_SLEEPQ);
if (thread->due > now)
break;
_ST_DEL_SLEEPQ(thread);
/* If thread is waiting on condition variable, set the time out flag */
if (thread->state == _ST_ST_COND_WAIT)
thread->flags |= _ST_FL_TIMEDOUT;
/* Make thread runnable */
ST_ASSERT(!(thread->flags & _ST_FL_IDLE_THREAD));
thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(thread);
}
}初始化最重要的两个点
1. 协程栈的初始化;协程栈通过mmap分配
2. 设置setjimp,当dispatch调度任务后进入协程入库函数
st协程的初始化源码:
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
_st_thread_t *trd;
_st_stack_t *stack;
void **ptds;
char *sp;
/* Adjust stack size */
if (stk_size == 0) {
stk_size = ST_DEFAULT_STACK_SIZE;
}
stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;
stack = _st_stack_new(stk_size);
if (!stack) {
return NULL;
}
/* Allocate thread object and per-thread data off the stack */
sp = stack->stk_top;
/*
* The stack segment is split in the middle. The upper half is used
* as backing store for the register stack which grows upward.
* The lower half is used for the traditional memory stack which
* grows downward. Both stacks start in the middle and grow outward
* from each other.
*/
/**
The below comments is by winlin:
The Stack public structure:
+--------------------------------------------------------------+
| stack |
+--------------------------------------------------------------+
bottom top
The code bellow use the stack as:
+-----------------+-----------------+-------------+------------+
| stack of thread |pad+align(128B+) |thread(336B) | keys(128B) |
+-----------------+-----------------+-------------+------------+
bottom sp trd ptds top
(context[0].__jmpbuf.sp) (private_data)
*/
sp = sp - (ST_KEYS_MAX * sizeof(void *));
ptds = (void **) sp;
sp = sp - sizeof(_st_thread_t);
trd = (_st_thread_t *) sp;
/* Make stack 64-byte aligned */
if ((unsigned long)sp & 0x3f) {
sp = sp - ((unsigned long)sp & 0x3f);
}
stack->sp = sp - _ST_STACK_PAD_SIZE;
memset(trd, 0, sizeof(_st_thread_t));
memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));
/* Initialize thread */
trd->private_data = ptds;
trd->stack = stack;
trd->start = start;
trd->arg = arg;
if (MD_SETJMP((trd)->context)) {
_st_thread_main();
}
MD_GET_SP(trd) = (long) (stack->sp);
/* If thread is joinable, allocate a termination condition variable */
if (joinable) {
trd->term = st_cond_new();
if (trd->term == NULL) {
_st_stack_free(trd->stack);
return NULL;
}
}
/* Make thread runnable */
trd->state = _ST_ST_RUNNABLE;
_st_active_count[THREAD_INDEX]++;
_ST_ADD_RUNQ(trd);
return trd;
}
协程的上下文切换通过调度切换,当出现i/o或者协程退出后发生上下文切换,切换逻辑,遍历runnable列表
1.有可执行任务,直接恢复可执行任务的上下文
2. 没有有可执行任务,调用dispatch,知道出现了新的可执行的事件
切换源码如下:
// 切换阻塞协程
#define _ST_SWITCH_CONTEXT(_thread) \
ST_BEGIN_MACRO \
ST_SWITCH_OUT_CB(_thread); \
if (!MD_SETJMP((_thread)->context)) { \
_st_vp_schedule(); \
} \
ST_DEBUG_ITERATE_THREADS(); \
ST_SWITCH_IN_CB(_thread); \
ST_END_MACRO
// 恢复协程上下文
#define _ST_RESTORE_CONTEXT(_thread) \
ST_BEGIN_MACRO \
_ST_SET_CURRENT_THREAD(_thread); \
MD_LONGJMP((_thread)->context, 1); \
ST_END_MACRO
void _st_vp_schedule(void)
{
_st_thread_t *thread;
unsigned int cost_time;
static __thread char buf[80];
if (_ST_RUNQ.next != &_ST_RUNQ) {
/* Pull thread off of the run queue */
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
} else {
/* If there are no threads to run, switch to the idle thread */
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
/* Resume the thread */
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}st巧妙的利用了jump由用户程序自发的调用cpu,相比线程有私有栈的跳转切换小了很多,整个调度过程是FIFO,没有抢占模式。st源码vp等全局变量使得,st只支持单线程(多进程数据独立,可以分别使用)。由于团队业务的一些特殊性,需要st支持多线程,将st的全局变量进行响应的封装,可以将st改造成多线程版本。
1. 主页源码:https://sourceforge.net/projects/state-threads/files/
2. 进程、线程、协程参考:http://kexianda.info/2017/05/19/进程-线程-协程的实现原理/
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。