
实现高并发服务器的基础是实现基于事件触发的Reactor模型,通过Reactor模型对事件进行统一管理。对此我们需要设计:
// Channel类管理事件集
class Poller;
class EventLoop;
class Channel
{
private:
int _fd;
// Poller *_poller;
EventLoop *_loop;
uint32_t _events; // 需要监控的事件集
uint32_t _revents; // 当前连接就绪的事件集
using EventCallBack = std::function<void()>;
// 五个回调函数
EventCallBack _read_cb; // 可读事件回调函数
EventCallBack _write_cb; // 可写事件回调函数
EventCallBack _close_cb; // 连接断开事件回调函数
EventCallBack _error_cb; // 错误事件回调函数
EventCallBack _event_cb; // 任意事件回调函数
public:
Channel(EventLoop *loop, int fd) : _fd(fd), _loop(loop), _events(0), _revents(0) {}
// 设置回调函数
void SetReadCallBack(const EventCallBack &cb) { _read_cb = cb; }
void SetWriteCallBack(const EventCallBack &cb) { _write_cb = cb; }
void SetCloseCallBack(const EventCallBack &cb) { _close_cb = cb; }
void SetErrorCallBack(const EventCallBack &cb) { _error_cb = cb; }
void SetEventCallBack(const EventCallBack &cb) { _event_cb = cb; }
// 设置Revents函数
void SetRevents(uint32_t events)
{
// LOG(DEBUG, "事件更新revents:%d\n", events);
_revents = events;
}
// 返回需要监控的事件集
uint32_t Events() { return _events; }
int Fd() { return _fd; }
// 检查当前是否可读
bool Readable() { return (_events & EPOLLIN); }
// 检查当前是否可写
bool Writeable() { return (_events & EPOLLOUT); }
// 启动/关闭可写监控
void EnableRead()
{
LOG(DEBUG, "fd:%d 加入EPOLLIN监控\n", _fd);
_events |= EPOLLIN;
Update();
}
void DisableRead()
{
_events &= ~EPOLLIN;
Update();
}
// 启动/关闭可读监控
void EnableWrite()
{
_events |= EPOLLOUT;
Update();
}
void DisableWrite()
{
_events &= ~EPOLLOUT;
Update();
}
// 关闭所有事件的监控
void DisableAll()
{
_events = 0;
Update();
}
// 移除监控 --- 涉及poller 要在poller之后进行实现
void Remove();
// 更新Channel的事件监控
void Update();
// HandleEvent用来进行事件处理!
void HandleEvent()
{
// 根据revents判断需要执行哪些回调函数
// 可读事件 半关闭连接 带外数据 紧急数据
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
if (_read_cb)
_read_cb();
// 不管任何事件都要调用的回调函数!
if (_event_cb)
_event_cb(); // 放到事件处理完之后调用 刷新活跃度
}
// --- 有可能释放连接的操作事件 , 一次只处理一个!---
// 可写事件
if (_revents & EPOLLOUT)
{
if (_write_cb)
_write_cb();
// 不管任何事件都要调用的回调函数!
if (_event_cb)
_event_cb(); // 放到事件处理完之后调用 刷新活跃度
}
// 错误事件
else if (_revents & EPOLLERR)
{
// 不管任何事件都要调用的回调函数!
if (_event_cb)
_event_cb();
// _event_cb必须放在前面 ,因为出错就会释放连接
if (_error_cb)
_error_cb();
}
// 连接关闭
else if (_revents & EPOLLHUP)
{
if (_event_cb)
_event_cb();
// _event_cb必须放在前面 ,因为出错就会释放连接
if (_close_cb)
_close_cb();
}
}
};
//...
//...
void Channel::Remove() { return _loop->RemoveEvent(this); }
// 更新Channel的事件监控
void Channel::Update() { return _loop->UpdateEvent(this); }Poller模块是对描述符IO进行监控的模块。
其中对多路转接的接口进行封装:
所以:
struct epoll_event数组,监控时获取所有的活跃事件运行逻辑为:
这里最核心的部分:事件循环Poll 方法。
Poll 方法是事件循环的核心,它调用epoll_wait阻塞等待事件发生。当有事件发生时,epoll_wait 返回,Poll 方法遍历返回的事件列表 _evs,根据事件对应的文件描述符在 _event_channels 中找到对应的Channel对象,设置事件类型,并将其加入到活跃事件列表 active 中,以供上层进行后续处理。
// 多路转接方法Poller
#define MAX_POLLERSIZE 1024
class Poller
{
private:
int _epfd; // Epoll模型
struct epoll_event _evs[MAX_POLLERSIZE];
std::unordered_map<int, Channel *> _event_channels; // fd与Channel的映射表
private:
void Update(Channel *channel, int op)
{
// 根据channel初始化
struct epoll_event ev;
ev.data.fd = channel->Fd();
ev.events = channel->Events();
int ret = ::epoll_ctl(_epfd, op, channel->Fd(), &ev);
if (ret < 0)
{
LOG(ERROR, "epoll_ctl failed!\n");
::abort(); // 直接退出程序
}
return;
}
bool HasChannel(Channel *channel)
{
auto it = _event_channels.find(channel->Fd());
if (it == _event_channels.end())
{
return false;
}
return true;
}
public:
// 构造函数
Poller()
{
_epfd = ::epoll_create(MAX_POLLERSIZE);
if (_epfd < 0)
{
LOG(FATAL, "epoll_create failed!\n");
::abort();
}
}
// 更新Event
void UpdateEvent(Channel *channel)
{
// 先判断channel是否在channels中
if (HasChannel(channel) == false)
{
// 先建立托管
//_event_channels[channel->Fd()] = channel;
LOG(INFO, "Poller 加入新的fd事件托管 fd:%d\n", channel->Fd());
_event_channels.insert(std::make_pair(channel->Fd(), channel));
// 进行添加
return Update(channel, EPOLL_CTL_ADD);
}
else
{
// 进行更新
LOG(INFO, "Poller 加入事件托管 fd:%d\n", channel->Fd());
return Update(channel, EPOLL_CTL_MOD);
}
}
// 移除Event
void RemoveEvent(Channel *channel)
{
// 先判断channel是否在channels中
if (HasChannel(channel) == false)
{
// 不在_event_channel里直接返回!
return;
}
// 进行移除
_event_channels.erase(channel->Fd());
Update(channel, EPOLL_CTL_DEL);
}
// 开始监控事件
void Poll(std::vector<Channel *> *active)
{
// 进行监控
// 阻塞式等待
int nfds = ::epoll_wait(_epfd, _evs, MAX_POLLERSIZE, -1);
if (nfds < 0)
{
if (errno == EINTR)
return;
LOG(ERROR, "epoll_wait error:%s", strerror(errno));
::abort();
}
// 对evs中的事件进行处理
// LOG(DEBUG, "Poll 获取到新事件 n:%d\n", nfds);
for (int i = 0; i < nfds; i++)
{
auto it = _event_channels.find(_evs[i].data.fd);
// 判断是否存在 不存在直接返回
assert(it != _event_channels.end());
// LOG(DEBUG, "channel : %d _evs[i].events:%d\n", _evs[i].data.fd, _evs[i].events);
// 进行调用
it->second->SetRevents(_evs[i].events);
// 进行处理
// it->second->HandleEvent();
// 处理结束放入活跃队列
active->push_back(it->second);
}
}
};EventLoop模块是管理事件监控管理的模块,就是Reactor反应堆模型。该模块与线程一一对应关联!
监控一个连接,这个连接一旦就绪,就要进行处理!如果这个连接描述符在多个线程中都触发了事件,就会存在线程安全问题!因此我们需要将一个连接的事件监控,以及连接事件处理和其他操作都放在同一个线程中进行处理!
后续如果接入了线程池,那么如何保证一个连接的所有操作都在EventLoop所在线程中?在EventLoop()中,添加一个任务队列。对连接的所有操作,都进行一次封装,对连接的操作要当做任务放入任务队列
事件监控 -> 事件处理(放入队列) -> 执行任务
这样可以保证对于连接的所有操作都是在一个线程中执行的,不涉及线程安全问题,是对于任务队列的操作有线程安全问题!只需要给task的操作加一把锁即可!
EventLoop处理流程

成员变量
EventLoop对应线程中执行,保证对连接的各项操作都是线程安全的,可以根据ID判断所执行的操作是不是在所属线程中,如果在当前线程中可以直接执行;如果执行的操作不再线程中,才需要加入到任务池中,等待事件处理完然后执行任务成员函数:

timewheel 与 EventLoop 模块整合操作:
要实现一个完整的秒级定时器,就需要将这两个功能整合到一起:
修改时间轮timewheel模块:
/ EventLoop类 --- Reactor反应堆模型,管理监控连接事件
using Functor = std::function<void()>;
class EventLoop
{
private:
std::thread::id _event_id; // 线程ID
int _eventfd; // eventfd 用于通知事件
std::unique_ptr<Channel> _event_channel; // 管理Event事件的Channel对象
Poller _poller; // epoll模型
std::vector<Functor> _tasks; // 任务池
std::mutex _mtx; // 互斥锁保护线程
// 时间轮
TimeWheel _timer_wheel;
private:
int CreateEventfd()
{
// 禁止进程复制 启动非阻塞读取
int efd = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0)
{
LOG(ERROR, "Eventfd Create failed!\n");
return -1;
}
return efd;
}
void ReadEvent()
{
// 从eventfd中读取数据
// 注意每次读取的都是一个8字节数据
uint64_t ret;
int n = ::read(_eventfd, &ret, sizeof(uint64_t));
if (n <= 0)
{
// 被信号打断 表示没有数据
if (errno == EINTR || errno == EAGAIN)
return;
LOG(ERROR, "eventfd recv failed!\n");
::abort();
}
// 读取数据执行任务
return;
}
void WeakUpEventfd()
{
uint64_t val = 1;
int n = ::write(_eventfd, &val, sizeof(uint64_t));
if (n <= 0)
{
if (errno == EINTR)
return;
LOG(ERROR, "eventfd send failed!\n");
::abort();
}
// 读取数据执行任务
return;
}
public:
EventLoop() : _event_id(std::this_thread::get_id()),
_eventfd(CreateEventfd()),
_event_channel(new Channel(this, _eventfd)),
_timer_wheel(this)
{
//_poller = Poller();
// 设置Eventfd的读回调函数
_event_channel->SetReadCallBack(std::bind(&EventLoop::ReadEvent, this));
// 设置读事件监控
_event_channel->EnableRead();
LOG(DEBUG, "EventLoop 构造完成\n");
}
bool IsInLoop()
{
return (_event_id == std::this_thread::get_id());
}
void AssertInLoop()
{
return assert(_event_id == std::this_thread::get_id());
}
void RunInLoop(const Functor &cb)
{
if (IsInLoop())
return cb();
// 否则压入任务池
return QueueLoop(cb);
}
void QueueLoop(const Functor &cb)
{
{
std::unique_lock<std::mutex> lock(_mtx);
_tasks.push_back(cb);
}
// 唤醒有可能因为没有事件就绪导致的epoll阻塞!
WeakUpEventfd();
}
// 添加/修改监控
void UpdateEvent(Channel *channel)
{
return _poller.UpdateEvent(channel);
}
// 移除监控
void RemoveEvent(Channel *channel)
{
return _poller.RemoveEvent(channel);
}
void RunAllTask()
{
std::vector<Functor> tasks;
{
// 进行上锁
std::unique_lock<std::mutex> lock(_mtx);
_tasks.swap(tasks);
}
// 进行执行函数
for (auto &f : tasks)
{
f();
}
return;
}
// 开始监控函数
void Start()
{
while (1)
{
std::vector<Channel *> actives;
_poller.Poll(&actives);
// 事件处理(放入队列) 遍历活跃连接,进行事件回调
for (auto &channel : actives)
{
channel->HandleEvent();
}
// 执行任务 执行任务RunAllTask
RunAllTask();
}
}
// 增加时间轮系列接口
void TimerAdd(uint64_t id, int delay, Task_t cb) { _timer_wheel.TimerAdd(id, delay, cb); }
void TimerRefresh(uint64_t id) { _timer_wheel.TimerRefresh(id); }
void TimerCancel(uint64_t id) { _timer_wheel.TimerCancel(id); }
bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};这是该项目中最重要的一个模块! 该模块就是对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块进行!
需要管理:
提供功能:
Connection模块是对连接的管理模块,对于连接的所有操作都是通过这个模块完成的!
当这样的一个场景中:对连接进行操作的时候,但是连接已经被释放了,导致内存访问错误,程序崩溃! 对于这个场景的解决方案:使智能指针share_ptr对Connection进行管理,只有计数为0时才会真正释放!
Connection要继承enable_shared_from_this<Connection>,这样可以方便的快速获取当前对象的shared_ptr指针。
成员变量:
成员函数:
用户接口:
class Connection;
// 核心模块 --- Connection类
typedef enum
{
DISCONNECTED = 0,
CONNECTING,
CONNECTED,
DISCONNETCING
} ConnStatu;
using PtrConn = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:
uint64_t _conn_id; // connection连接ID
Socket _socket; // 管理的套接字
int _sockfd; // 套接字fd
EventLoop *_loop; // connection连接关联的EventLoop对象
Any _context; // 上下文数据
Channel _channel; // 管理连接事件
Buffer _in_buffer; // 输入缓冲区 存放Socket中读取的数据
Buffer _out_buffer; // 输出缓冲区 存放要发送给对端的数据
bool _enable_active_release; // 是否开启超时销毁 默认是false
ConnStatu _statu; // Connection连接状态
// 5 个 回调函数 --- 注意使用智能指针 防止在执行任务之前Connection销毁
using ConnectedCallBack = std::function<void(const PtrConn &)>; // 连接时进行的回调函数
using MessageCallBack = std::function<void(const PtrConn &, Buffer *)>; // 处理数据时的回调函数
using ClosedCallBack = std::function<void(const PtrConn &)>; // 关闭连接时的回调函数
using AnyEventCallBack = std::function<void(const PtrConn &)>; // 处理任意事件时的回调函数
ConnectedCallBack _conn_cb; // 连接回调函数类型
MessageCallBack _message_cb; // 处理时回调函数
ClosedCallBack _closed_cb; // 关闭阶段的回调
AnyEventCallBack _event_cb; // 任意事件触发的回调
// 还需要组件内的连接关闭回调 因为服务器组件内会把所有的连接管理起来 一旦某个连接关闭 就应该从管理的地方移除自己的信息!
ClosedCallBack _event_closed_cb;
private:
// 读事件触发的函数
void HandleRead()
{
// 接收Socket数据放到接收缓冲区中
// LOG(DEBUG, "HandleRead\n");
char buf[65536] = {0};
int ret = _socket.NonBlockRecv(buf, 65536);
// 返回值 为 - 1说明读取错误
if (ret < 0)
{
return ShutdownInLoop();
}
// 返回值为0说明没读取到数据
// 将数据写入到缓冲区
_in_buffer.WriteAndPush(buf, ret);
// 然后调用_message_callback回调
if (_in_buffer.ReadAbleSize() > 0)
{
return _message_cb(shared_from_this(), &_in_buffer);
}
}
// 写事件触发的函数
void HandleWrite()
{
// 将输出缓冲区的数据向Socket描述符中进行非阻塞写入
int ret = _socket.NonBlockSend(_out_buffer.ReadPos(), _out_buffer.ReadAbleSize());
// 发送错误就关闭连接
if (ret < 0)
{
// 如果输入缓冲区还有数据,就要进行处理之后再关闭连接
if (_in_buffer.ReadAbleSize() > 0)
_message_cb(shared_from_this(), &_in_buffer);
return ReleaseInLoop();
}
// 缓冲区读偏移向后移动
_out_buffer.MoveReadOffset(ret);
if (_out_buffer.ReadAbleSize() == 0)
{
// 关闭写事件监控
_channel.DisableWrite();
// 如果是待关闭状态就进行关闭
if (_statu == DISCONNETCING)
{
return Release();
}
}
return;
}
// 连接关闭触发的函数
void HandleClose()
{
// 如果还有数据,就进行一次处理
if (_in_buffer.ReadAbleSize() > 0)
{
_message_cb(shared_from_this(), &_in_buffer);
}
// 释放连接
return Release();
}
// 错误事件触发的函数
void HandleError()
{
return HandleClose(); // 直接调用关闭函数
}
void HandleEvent()
{
// 刷新活跃度
if (_enable_active_release == true)
{
_loop->TimerRefresh(_conn_id);
}
if (_event_cb)
_event_cb(shared_from_this());
}
// 发送函数 不是直接进行发送 而是将数据拷贝到输出缓冲区 然后启动写事件监控
void SendInLoop(const char *data, size_t len)
{
if (_statu == DISCONNECTED)
return;
_out_buffer.WriteAndPush(data, len);
if (_channel.Writeable() == false)
{
_channel.EnableWrite();
}
}
// 这个关闭操作不是真正的关闭函数 而是腰判断是否还有时间要进行处理
void ShutdownInLoop()
{
_statu = DISCONNETCING; // 状态设置为待关闭状态
// 如果输入缓冲区有数据 ,要进行一次处理
if (_in_buffer.ReadAbleSize() > 0)
{
if (_message_cb)
_message_cb(shared_from_this(), &_in_buffer);
}
// 如果输出缓存区有数据,就要启动写事件监控
if (_out_buffer.ReadAbleSize() > 0)
{
if (_channel.Writeable() == false)
_channel.EnableWrite();
}
// 如果输出缓冲区数据没有 直接进行关闭
if (_out_buffer.ReadAbleSize() == 0)
{
Release();
}
}
void EnableInactiveReleaseInLoop(int sec)
{
// LOG(DEBUG, "EnableInactiveReleaseInLoop %d s\n", sec);
// 将超时销毁标志位设置为true
_enable_active_release = true;
// 如果已经有超时任务,那么就进行一次刷新
if (_loop->HasTimer(_conn_id))
{
return _loop->TimerRefresh(_conn_id);
}
// 没有就进行添加
_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
}
void CancelInactiveReleaseInLoop()
{
// 将超时销毁标志位设置为false
_enable_active_release = false;
// 如果有超时任务才进行取消
if (_loop->HasTimer(_conn_id))
{
_loop->TimerCancel(_conn_id);
}
}
void UpgradeInLoop(const Any &context,
const ConnectedCallBack &conn_cb,
const MessageCallBack &mess_cb,
const ClosedCallBack &closed_cb,
const AnyEventCallBack &event_cb)
{
_context = context;
_conn_cb = conn_cb;
_message_cb = mess_cb;
_closed_cb = closed_cb;
_event_cb = event_cb;
}
// 真正的关闭函数
void ReleaseInLoop()
{
LOG(INFO, "Realse Connection:%p\n", shared_from_this());
// 1. 修改连接状态
_statu = DISCONNECTED;
// 2. 移除所有事件监控
_channel.Remove();
// 3. 关闭描述符
_socket.Close();
// 4. 取消定时任务
if (_loop->HasTimer(_conn_id))
CancelInactiveReleaseInLoop();
// 5. 执行用户设置的关闭回调
if (_closed_cb)
_closed_cb(shared_from_this());
// 6. 执行组件内的关闭回调
if (_event_closed_cb)
_event_closed_cb(shared_from_this());
}
// 连接获取之后所处的状态下要进行各种设置 启动读事件监控 调用回调函数
void EstablishedInLoop()
{
// LOG(DEBUG, "EstablishedInLoop()\n");
// 必须是连接中状态才执行
assert(_statu == CONNECTING);
_statu = CONNECTED; // 1.执行完 修改为已连接状态
_channel.EnableRead(); // 2.启动读事件监控
if (_conn_cb)
_conn_cb(shared_from_this()); // 3. 调用连接回调函数
}
public:
Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(_conn_id),
_socket(sockfd),
_sockfd(sockfd),
_loop(loop),
_channel(_loop, _sockfd),
_enable_active_release(false),
_statu(CONNECTING)
{
// 设置channel回调函数
_channel.SetCloseCallBack(std::bind(&Connection::HandleClose, this));
_channel.SetErrorCallBack(std::bind(&Connection::HandleError, this));
_channel.SetWriteCallBack(std::bind(&Connection::HandleWrite, this));
_channel.SetEventCallBack(std::bind(&Connection::HandleEvent, this));
_channel.SetReadCallBack(std::bind(&Connection::HandleRead, this));
}
~Connection()
{
LOG(INFO, "Release Connection :%p", this);
}
// 基础接口
uint64_t Id() { return _conn_id; } // 返回Connection的id
int Fd() { return _sockfd; } // 返回套接字描述符
void SetContext(const Any &context) { _context = context; } // 设置上下文
Any *GetContext() { return &_context; } // 获取上下文
bool Connected() { return (_statu == CONNECTED); } // 判断是否处于连接状态!
// 设置回调函数
void SetConnectCB(const ConnectedCallBack &cb) { _conn_cb = cb; }
void SetMessageCB(const MessageCallBack &cb) { _message_cb = cb; }
void SetClosedCB(const ClosedCallBack &cb) { _closed_cb = cb; }
void SetAnyEventCB(const AnyEventCallBack &cb) { _event_cb = cb; }
void SetSvrClosedCB(const ClosedCallBack &cb) { _event_closed_cb = cb; }
// 发送数据
void Send(const char *data, size_t len) { _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, data, len)); }
// 关闭连接 --- 提供给用户的关闭,_不是真正的关闭连接 , 需要判断有没有数据待处理
void Shutdown() { _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this)); }
// 启动超时销毁
void EnableInactiveRelease(int sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); }
// 取消超时销毁
void CancelInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this)); }
// 进行channel回调设置
void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); }
void Release()
{
// LOG(DEBUG, "Release()\n");
_loop->QueueLoop(std::bind(&Connection::ReleaseInLoop, this));
}
// 切换协议
void Upgrade(const Any &context,
const ConnectedCallBack &conn_cb,
const MessageCallBack &mess_cb,
const ClosedCallBack &closed_cb,
const AnyEventCallBack &event_cb)
{
// 切换协议 --- 重置上下文以及阶段性回调处理函数 这个函数必须在EventLoop中立刻执行
// 预防新事件触发后 ,处理时还是原先的协议! --- 导致数据处理异常
_loop->AssertInLoop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn_cb, mess_cb, closed_cb, event_cb));
}
};专门对监听套接字进行管理的类
该模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心!
成员变量 1. 套接字对象:Socket 用于创建监听套接字 2. EventLoop* _loop :对监听套接字进行事件管理 3. Channel _channel :用于对监听套接字进行事件管理 4. 新连接获取之后的回调函数 AcceptCallBack AcceptCallBack _accept_callback;
成员函数 1. 构造函数 2. 监听套接字读事件回调函数 — 调用 _accept_callback,进行新连接处理 3. 创建套接字 返回描述符。
class Acceptor
{
private:
Socket _socket; // 套接字对象
EventLoop *_loop; // 对监听套接字进行事件监控
Channel _channel; // 用于对今天套接的事件管理
using AcceptCallBack = std::function<void(int)>;
AcceptCallBack _accept_callback;
int CreateSocket(int port)
{
bool ret = _socket.CreateServer(port);
assert(ret == true);
return _socket.Sockfd();
}
// 读事件回调函数
void HandleRead()
{
// 获取新连接
int newfd = _socket.Accept();
if (newfd < 0)
{
LOG(ERROR, "Accept failed\n");
return;
}
if (_accept_callback)
_accept_callback(newfd);
}
public:
Acceptor(EventLoop *loop, int port) : _socket(CreateSocket(port)), _loop(loop), _channel(_loop, _socket.Sockfd())
{
_channel.SetReadCallBack(std::bind(&Acceptor::HandleRead, this));
// 开启读事件监控!
//_channel.EnableRead();
}
~Acceptor()
{
_socket.Close();
}
void SetAcceptCallBack(const AcceptCallBack &cb)
{
_accept_callback = cb;
}
void Listen()
{
// 开启读事件监控!
_channel.EnableRead();
}
};现在,服务器模块基本实现!